GEODE-8152, GEODE-8153: replace ACE sockets with boost/asio (#697)
Co-authored-by: Matthew Reddington <mreddington@pivotal.io>
diff --git a/cppcache/CMakeLists.txt b/cppcache/CMakeLists.txt
index 0e4126e..1c11487 100644
--- a/cppcache/CMakeLists.txt
+++ b/cppcache/CMakeLists.txt
@@ -24,6 +24,11 @@
check_function_exists("pthread_setname_np" HAVE_pthread_setname_np)
endif()
+# Search OpenSSL
+find_package(OpenSSL COMPONENTS Crypto)
+
+include_directories(${OPENSSL_INCLUDE_DIRS})
+
set(COMMON_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src)
set(COMMON_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/include)
@@ -89,6 +94,8 @@
Boost::thread
Boost::stacktrace
XercesC::XercesC
+ OpenSSL::SSL
+ OpenSSL::Crypto
)
if (USE_PCH)
diff --git a/cppcache/integration-test/DeltaEx.hpp b/cppcache/integration-test/DeltaEx.hpp
index f1f4de2..783d64a 100644
--- a/cppcache/integration-test/DeltaEx.hpp
+++ b/cppcache/integration-test/DeltaEx.hpp
@@ -68,7 +68,7 @@
int32_t val = in.readInt32();
if (fromDeltaCount == 1) {
fromDeltaCount++;
- LOG("Invalid Delta expetion thrown");
+ LOG("Invalid Delta exception thrown");
throw InvalidDeltaException("aaannn");
}
counter += val;
@@ -120,7 +120,7 @@
int32_t val = in.readInt32();
if (m_fromDeltaCount == 1) {
m_fromDeltaCount++;
- LOG("Invalid Delta expetion thrown");
+ LOG("Invalid Delta exception thrown");
throw InvalidDeltaException("aaannn");
}
m_counter += val;
diff --git a/cppcache/integration-test/fw_dunit.cpp b/cppcache/integration-test/fw_dunit.cpp
index 82b7395..257a1f2 100644
--- a/cppcache/integration-test/fw_dunit.cpp
+++ b/cppcache/integration-test/fw_dunit.cpp
@@ -106,7 +106,7 @@
ACE::timestamp(timestamp, sizeof timestamp);
// timestamp is like "Tue May 17 2005 12:54:22.546780"
// for our purpose we just want "12:54:22.546780"
- strncpy(bufPtr, ×tamp[16], sizeOfBuf);
+ strncpy(bufPtr, ×tamp[0], sizeOfBuf);
}
// some common values..
diff --git a/cppcache/integration-test/testXmlCacheCreationWithPools.cpp b/cppcache/integration-test/testXmlCacheCreationWithPools.cpp
index 6a779e6..6f7ef01 100644
--- a/cppcache/integration-test/testXmlCacheCreationWithPools.cpp
+++ b/cppcache/integration-test/testXmlCacheCreationWithPools.cpp
@@ -26,8 +26,6 @@
#include "fw_dunit.hpp"
#define CLIENT1 s1p1
-#define SERVER1 s2p1
-#define SERVER2 s2p2
#include "CacheHelper.hpp"
@@ -45,8 +43,6 @@
const char *locatorsG =
CacheHelper::getLocatorHostPort(isLocator, isLocalServer, numberOfLocators);
-#include "LocatorHelper.hpp"
-
using std::string;
using std::vector;
@@ -406,7 +402,11 @@
if (!cptr->isClosed()) {
cptr->close();
- cptr = nullptr;
+ // Do not set it to null because the destructor will be invoked here and
+ // the regions and pools previously obtained, that will be deleted when the
+ // function returns, will make use of the their reference to the deleted
+ // cache and thus make the process crash.
+ // cptr = nullptr;
}
if (!check1 || !check2 || !check3) {
@@ -420,8 +420,7 @@
std::string filePath = "invalid_cache_pool.xml";
std::string duplicateFile;
CacheHelper::createDuplicateXMLFile(duplicateFile, filePath);
- cptr = std::make_shared<Cache>(
- cacheFactory.set("cache-xml-file", duplicateFile).create());
+ Cache cache = cacheFactory.set("cache-xml-file", duplicateFile).create();
return -1;
} catch (Exception &ex) {
std::cout << "EXPECTED EXCEPTION" << std::endl;
@@ -434,8 +433,7 @@
std::string filePath = "invalid_cache_pool2.xml";
std::string duplicateFile;
CacheHelper::createDuplicateXMLFile(duplicateFile, filePath);
- cptr = std::make_shared<Cache>(
- cacheFactory.set("cache-xml-file", duplicateFile).create());
+ Cache cache = cacheFactory.set("cache-xml-file", duplicateFile).create();
return -1;
} catch (Exception &ex) {
std::cout << "EXPECTED EXCEPTION" << std::endl;
@@ -448,8 +446,7 @@
std::string filePath = "invalid_cache_pool3.xml";
std::string duplicateFile;
CacheHelper::createDuplicateXMLFile(duplicateFile, filePath);
- cptr = std::make_shared<Cache>(
- cacheFactory.set("cache-xml-file", duplicateFile).create());
+ Cache cache = cacheFactory.set("cache-xml-file", duplicateFile).create();
return -1;
} catch (Exception &ex) {
std::cout << "EXPECTED EXCEPTION" << std::endl;
@@ -462,8 +459,7 @@
std::string filePath = "invalid_cache_pool4.xml";
std::string duplicateFile;
CacheHelper::createDuplicateXMLFile(duplicateFile, filePath);
- cptr = std::make_shared<Cache>(
- cacheFactory.set("cache-xml-file", duplicateFile).create());
+ Cache cache = cacheFactory.set("cache-xml-file", duplicateFile).create();
return -1;
} catch (Exception &ex) {
std::cout << "EXPECTED EXCEPTION" << std::endl;
@@ -471,15 +467,6 @@
LOG(ex.getStackTrace());
}
- std::cout << "disconnecting..." << std::endl;
- try {
- std::cout << "just before disconnecting..." << std::endl;
- if (cptr != nullptr) cptr->close();
- } catch (Exception &ex) {
- std::cout << "Exception: msg = " << ex.what() << std::endl;
- LOG(ex.getStackTrace());
- return -1;
- }
std::cout << "done with test" << std::endl;
std::cout << "Test successful!" << std::endl;
return 0;
diff --git a/cppcache/integration/test/SslOneWayTest.cpp b/cppcache/integration/test/SslOneWayTest.cpp
index d7c10f1..97fc923 100644
--- a/cppcache/integration/test/SslOneWayTest.cpp
+++ b/cppcache/integration/test/SslOneWayTest.cpp
@@ -80,7 +80,7 @@
(clientSslKeysDir /
boost::filesystem::path("client_truststore_chained_root.pem"));
auto cache = CacheFactory()
- .set("log-level", "DEBUG")
+ .set("log-level", "none")
.set("ssl-enabled", "true")
.set("ssl-truststore", clientTruststore.string())
.create();
diff --git a/cppcache/integration/test/SslTwoWayTest.cpp b/cppcache/integration/test/SslTwoWayTest.cpp
index 4cb1b41..877d3ae 100644
--- a/cppcache/integration/test/SslTwoWayTest.cpp
+++ b/cppcache/integration/test/SslTwoWayTest.cpp
@@ -15,6 +15,7 @@
* limitations under the License.
*/
+#include <iostream>
#include <thread>
#include <gtest/gtest.h>
@@ -83,7 +84,8 @@
(clientSslKeysDir /
boost::filesystem::path("client_truststore_chained_root.pem"));
auto cache = CacheFactory()
- .set("log-level", "DEBUG")
+ .set("log-level", "debug")
+ .set("log-file", "./gemfire.log")
.set("ssl-enabled", "true")
.set("ssl-keystore", clientKeystore.string())
.set("ssl-keystore-password", certificatePassword)
@@ -99,8 +101,29 @@
.setPoolName("pool")
.create("region");
- region->put("1", "one");
+ try {
+ region->put("1", "one");
+ } catch (Exception& ex) {
+ std::cout << ex.getStackTrace();
+ }
+ std::shared_ptr<apache::geode::client::Cacheable> value;
+ try {
+ value = region->get("1");
+ } catch (Exception& ex) {
+ std::cout << ex.getStackTrace();
+ }
+
+ EXPECT_TRUE(value);
+
+ auto string_value =
+ std::dynamic_pointer_cast<apache::geode::client::CacheableString>(value);
+
+ EXPECT_TRUE(string_value);
+
+ EXPECT_EQ(string_value->value(), "one");
+
+ std::cout << "Read " << string_value->value() << " from the server.";
cache.close();
}
@@ -185,7 +208,7 @@
boost::filesystem::path("client_truststore_chained_root.pem"));
auto cache = CacheFactory()
- .set("log-level", "DEBUG")
+ .set("log-level", "none")
.set("ssl-enabled", "true")
.set("ssl-keystore", clientCorruptKeystore.string())
.set("ssl-keystore-password", certificatePassword)
diff --git a/cppcache/integration/test/ThinClientConflation.cpp b/cppcache/integration/test/ThinClientConflation.cpp
new file mode 100644
index 0000000..26b9347
--- /dev/null
+++ b/cppcache/integration/test/ThinClientConflation.cpp
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <framework/Cluster.h>
+#include <framework/Framework.h>
+#include <framework/Gfsh.h>
+
+#include <gtest/gtest.h>
+
+class ThinClient : ::testing::Test {
+ class OperMonitor : public CacheListener {
+ int m_events;
+ int m_value;
+
+ void check(const EntryEvent &event) {
+ char buf[256] = {'\0'};
+ m_events++;
+ auto keyPtr = std::dynamic_pointer_cast<CacheableString>(event.getKey());
+ auto valuePtr =
+ std::dynamic_pointer_cast<CacheableInt32>(event.getNewValue());
+
+ if (valuePtr != nullptr) {
+ m_value = valuePtr->value();
+ }
+ sprintf(buf, "Key = %s, Value = %d", keyPtr->toString().c_str(),
+ valuePtr->value());
+ LOG(buf);
+ }
+
+ public:
+ OperMonitor() : m_events(0), m_value(0) {}
+ ~OperMonitor() {}
+
+ virtual void afterCreate(const EntryEvent &event) { check(event); }
+
+ virtual void afterUpdate(const EntryEvent &event) { check(event); }
+
+ void validate(bool conflation) {
+ LOG("validate called");
+ char buf[256] = {'\0'};
+
+ if (conflation) {
+ sprintf(buf, "Conflation On: Expected events = 2, Actual = %d",
+ m_events);
+ ASSERT(m_events == 2, buf);
+ } else {
+ sprintf(buf, "Conflation Off: Expected events = 5, Actual = %d",
+ m_events);
+ ASSERT(m_events == 5, buf);
+ }
+ sprintf(buf, "Expected Value = 5, Actual = %d", m_value);
+ ASSERT(m_value == 5, buf);
+ }
+ };
+
+ void configure_pool(apache::geode::client::Cache &cache) {
+ auto poolFactory = cache.getPoolManager()
+ .createFactory()
+ .setSubscriptionRedundancy(0)
+ .setSubscriptionEnabled(true)
+ .setSubscriptionAckInterval(std::chrono::seconds(1));
+
+ cluster_.applyLocators(poolFactory);
+
+ poolFactory.create("__TESTPOOL1_");
+ }
+
+ void configure_regions(apache::geode::client::Cache &cache) {
+ auto conflated_regionFactory =
+ conflated_cache_.createRegionFactory(RegionShortcut::CACHING_PROXY)
+ .setPoolName("__TESTPOOL1_");
+
+ auto conflated = regionFactory.create("ConflatedRegion");
+ auto non_conflated = regionFactory.create("NonConflatedRegion");
+
+ conflated->getAttributesMutator()->setCacheListener(
+ std::make_shared<OperMonitor>());
+ non - conflated->getAttributesMutator()->setCacheListener(
+ std::make_shared<OperMonitor>());
+ }
+
+ void configure_cache(apache::geode::client::Cache &cache) {
+ configure_pool(cache);
+ configure_regions(cache);
+ }
+
+ protected:
+ Cluster cluster_{LocatorCount{1}, ServerCount{1}};
+ apache::geode::client::Cache conflated_cache_{
+ apache::geode::client::CacheFactory()
+ .set("log-level", "none")
+ .set("statistic-sampling-enabled", "false")
+ .set("durable-client-id", "DurableId1")
+ .set("durable-timeout", std::chrono::seconds(300))
+ .set("conflate-events", "true")
+ .create()},
+ non_conflated_cache_{
+ apache::geode::client::CacheFactory()
+ .set("log-level", "none")
+ .set("statistic-sampling-enabled", "false")
+ .set("durable-client-id", "DurableId2")
+ .set("durable-timeout", std::chrono::seconds(300))
+ .set("conflate-events", "false")
+ .create()},
+ feeder_cache_{apache::geode::client::CacheFactory()
+ .set("log-level", "none")
+ .set("statistic-sampling-enabled", "false")
+ .set("durable-client-id", "DurableId1")
+ .set("durable-timeout", std::chrono::seconds(300))
+ .set("conflate-events", "true")
+ .create()};
+
+ ThinClient() {
+ cluster_.start();
+
+ cluster_.getGfsh()
+ .create()
+ .region()
+ .withName("ConflatedRegion")
+ .withType("REPLICATE")
+ .execute();
+
+ cluster_.getGfsh()
+ .create()
+ .region()
+ .withName("NonConflatedRegion")
+ .withType("REPLICATE")
+ .execute();
+
+ configure_cache(conflated_cache_);
+ configure_cache(non_conflated_cache_);
+ configure_cache(feeder_cache_);
+ }
+};
+
+TEST_F(ThinClient, Conflation) {
+ auto conflated = feeder_cache_.getRegion("ConflatedRegion");
+
+ for(auto i = 1; i < 6; ++i) {
+ conflated.put(std::string("Key-").append(std::to_string(i)), i);
+ }
+
+ auto non_conflated = feeder_cache_.getRegion("NonConflatedRegion");
+
+ for(auto i = 1; i < 6; ++i) {
+ non_conflated.put(std::string("Key-").append(std::to_string(i)), i);
+ }
+}
\ No newline at end of file
diff --git a/cppcache/src/CacheImpl.cpp b/cppcache/src/CacheImpl.cpp
index 1b5e6a2..12fc785 100644
--- a/cppcache/src/CacheImpl.cpp
+++ b/cppcache/src/CacheImpl.cpp
@@ -161,7 +161,9 @@
}
void CacheImpl::removeRegion(const std::string& name) {
+ LOGDEBUG("recursive lock: CacheImpl::removeRegion");
std::lock_guard<decltype(m_destroyCacheMutex)> lock(m_destroyCacheMutex);
+ LOGDEBUG("locked: CacheImpl::removeRegion");
if (!m_destroyPending) {
m_regions.erase(name);
}
@@ -241,7 +243,9 @@
sendNotificationCloseMsgs();
{
+ LOGDEBUG("recursive lock: CacheImpl::setKeepAlive");
std::lock_guard<decltype(m_destroyCacheMutex)> lock(m_destroyCacheMutex);
+ LOGDEBUG("locked: CacheImpl::setKeepAlive");
if (m_destroyPending) {
return;
}
@@ -298,6 +302,8 @@
m_poolManager->close(keepalive);
+ m_poolManager.reset();
+
LOGFINE("Closed pool manager with keepalive %s",
keepalive ? "true" : "false");
@@ -331,7 +337,9 @@
}
bool CacheImpl::doIfDestroyNotPending(std::function<void()> f) {
+ LOGDEBUG("recursive lock: CacheImpl::doIfDestroyNotPending");
std::lock_guard<decltype(m_destroyCacheMutex)> lock(m_destroyCacheMutex);
+ LOGDEBUG("locked: CacheImpl::doIfDestroyNotPending");
if (!m_destroyPending) {
f();
}
@@ -466,7 +474,9 @@
LOGDEBUG("CacheImpl::getRegion " + path);
this->throwIfClosed();
+ LOGDEBUG("recursive lock: CacheImpl::getRegion");
std::lock_guard<decltype(m_destroyCacheMutex)> lock(m_destroyCacheMutex);
+ LOGDEBUG("locked: CacheImpl::getRegion");
if (m_destroyPending) {
return nullptr;
@@ -681,8 +691,10 @@
}
void CacheImpl::processMarker() {
+ LOGDEBUG("recursive lock: CacheImpl::processMarker");
std::lock_guard<decltype(m_destroyCacheMutex)> destroy_lock(
m_destroyCacheMutex);
+ LOGDEBUG("locked: CacheImpl::processMarker");
if (m_destroyPending) {
return;
}
diff --git a/cppcache/src/Connector.hpp b/cppcache/src/Connector.hpp
index b34285d..98ce3d4 100644
--- a/cppcache/src/Connector.hpp
+++ b/cppcache/src/Connector.hpp
@@ -39,8 +39,9 @@
class Connector {
public:
- Connector() {}
- virtual ~Connector() {}
+ Connector() = default;
+ virtual ~Connector() = default;
+
Connector(const Connector &) = delete;
Connector &operator=(const Connector &) = delete;
@@ -48,13 +49,16 @@
* Reads <code>len</code> bytes of data and stores them into the buffer
* array <code>b</code>. The number of bytes actually read is returned as an
* integer. This method blocks until <code>len</code> bytes of data is
- * read, or an exception is thrown.
+ * read, the timer expires or an exception is thrown.
*
* <p> If <code>b</code> is <code>null</code>, or <code>len</code> is
* less than or equal to <code>0</code> an
* <code>IllegalArgumentException</code>
* is thrown.
*
+ * <p> If <code>len</code> bytes have not been read when the timeout
+ * expires a <code>TimeoutException</code> is thrown.
+ *
* <p> If <code>len</code> bytes cannot be read for any reason, then an
* <code>GeodeIOException</code> is thrown.
*
@@ -63,45 +67,88 @@
*
* @param b the buffer into which the data is read.
* @param len the number of bytes to read.
- * @param waitSeconds the number of seconds to allow the read to
- * complete.
+ * @param timeout time to allow the read to complete.
* @return the total number of bytes read into the buffer, or
* <code>-1</code> if an error was encountered.
* @exception GeodeIOException, TimeoutException, IllegalArgumentException,
* OutOfMemoryException.
*/
virtual size_t receive(char *b, size_t len,
- std::chrono::microseconds waitSeconds) = 0;
+ std::chrono::milliseconds timeout) = 0;
/**
+ * Reads an undetermined number of bytes of data and stores them into the
+ * buffer array <code>b</code>. The number of bytes actually read is returned
+ * as an integer. This method blocks until <code>len</code> bytes of data is
+ * read, the timeout expires or an exception is thrown.
+ *
+ * <p> If <code>b</code> is <code>null</code> an
+ * <code>IllegalArgumentException</code>
+ * is thrown.
+ *
+ * <p> The <code>read(b)</code> method for class <code>InputStream</code>
+ * has the same effect as: <pre><code> read(b, 0, b.length) </code></pre>
+ *
+ * @param b the buffer into which the data is read.
+ * @param timeout time to allow the read to complete.
+ * @return the total number of bytes read into the buffer, or
+ * <code>-1</code> if an error was encountered.
+ * @exception GeodeIOException, IllegalArgumentException,
+ * OutOfMemoryException.
+ */
+ virtual size_t receive_nothrowiftimeout(
+ char *b, size_t len, std::chrono::milliseconds timeout) = 0;
+ /**
* Writes <code>len</code> bytes from the specified byte array
* to the underlying output stream.
*
* @param b the data.
* @param len the number of bytes to write.
- * @param waitSeconds the number of seconds to allow the write to
- * complete.
+ * @param timeout time to allow the write to complete.
* @return the actual number of bytes written.
* @exception GeodeIOException, TimeoutException, IllegalArgumentException.
*/
virtual size_t send(const char *b, size_t len,
- std::chrono::microseconds waitSeconds) = 0;
-
- /**
- * Initialises the connection.
- */
- virtual void init() = 0;
-
- /**
- * Closes the connection.
- */
- virtual void close() = 0;
+ std::chrono::milliseconds timeout) = 0;
/**
* Returns local port for this TCP connection
*/
virtual uint16_t getPort() = 0;
+
+ /**
+ * Writes an array of a known size to the underlying output stream.
+ *
+ * @param array A C-style stack array. Be weary of arrays that have been
+ * decayed into pointers, they won't compile.
+ * @return The number of bytes written. Don't get confused: this is not the
+ * number of elements in the array written.
+ * @exception GeodeIOException, TimeoutException
+ */
+ template <typename T, size_t size>
+ size_t send(const T (&array)[size], std::chrono::milliseconds timeout) {
+ return send(reinterpret_cast<const char *>(array), sizeof(T) * size,
+ timeout);
+ }
+
+ /**
+ * Reads an array of a known size from the underlying input stream. If the
+ * number of bytes read is not equal to the size of the array, no exception
+ * will be thrown.
+ *
+ * @param array A C-style stack array. Be weary of arrays that have been
+ * decayed into pointers, they won't compile.
+ * @return The number of bytes written. Don't get confused: this is not the
+ * number of elements in the array written.
+ * @exception GeodeIOException
+ */
+ template <typename T, size_t size>
+ size_t receive(T (&array)[size], std::chrono::milliseconds timeout) {
+ return receive_nothrowiftimeout(reinterpret_cast<char *>(array),
+ sizeof(T) * size, timeout);
+ }
};
+
} // namespace client
} // namespace geode
} // namespace apache
diff --git a/cppcache/src/DistributedSystem.hpp b/cppcache/src/DistributedSystem.hpp
index a03ea6c..3b080b9 100644
--- a/cppcache/src/DistributedSystem.hpp
+++ b/cppcache/src/DistributedSystem.hpp
@@ -48,7 +48,6 @@
class SystemProperties;
class DistributedSystemImpl;
class CacheRegionHelper;
-class TcrConnection;
class APACHE_GEODE_EXPORT DistributedSystem {
/**
@@ -108,7 +107,6 @@
friend class CacheRegionHelper;
friend class DistributedSystemImpl;
- friend class TcrConnection;
};
} // namespace client
} // namespace geode
diff --git a/cppcache/src/PoolFactory.cpp b/cppcache/src/PoolFactory.cpp
index 9a43f64..465a1fc 100644
--- a/cppcache/src/PoolFactory.cpp
+++ b/cppcache/src/PoolFactory.cpp
@@ -172,14 +172,12 @@
}
PoolFactory& PoolFactory::addLocator(const std::string& host, int port) {
- addCheck(host, port);
m_attrs->addLocator(host, port);
m_addedServerOrLocator = true;
return *this;
}
PoolFactory& PoolFactory::addServer(const std::string& host, int port) {
- addCheck(host, port);
m_attrs->addServer(host, port);
m_addedServerOrLocator = true;
return *this;
@@ -306,12 +304,12 @@
}
PoolFactory& PoolFactory::addCheck(const std::string& host, int port) {
- if (port <= 0) {
- throw IllegalArgumentException("port must be greater than 0 but was " +
- std::to_string(port));
- }
-
if (m_attrs->getSniProxyHost().empty()) {
+ if (port <= 0) {
+ throw IllegalArgumentException("port must be greater than 0 but was " +
+ std::to_string(port));
+ }
+
ACE_INET_Addr addr(port, host.c_str());
#ifdef WITH_IPV6
// check unknown host
diff --git a/cppcache/src/ServerLocation.hpp b/cppcache/src/ServerLocation.hpp
index a0ce231..91cd670 100644
--- a/cppcache/src/ServerLocation.hpp
+++ b/cppcache/src/ServerLocation.hpp
@@ -89,7 +89,7 @@
}
void printInfo() {
- LOGDEBUG(" Got Host %s, and port %d", getServerName().c_str(), m_port);
+ LOGDEBUG(" Got Host \"%s\", and port %d", getServerName().c_str(), m_port);
}
bool operator<(const ServerLocation rhs) const {
diff --git a/cppcache/src/TcpConn.cpp b/cppcache/src/TcpConn.cpp
index d6e709e..298384c 100644
--- a/cppcache/src/TcpConn.cpp
+++ b/cppcache/src/TcpConn.cpp
@@ -17,253 +17,406 @@
#include "TcpConn.hpp"
-#include <thread>
+#include <iomanip>
+#include <iostream>
-#include <ace/SOCK_Connector.h>
-#include <boost/interprocess/mapped_region.hpp>
-
-#include <geode/ExceptionTypes.hpp>
-#include <geode/internal/chrono/duration.hpp>
+#include <boost/optional.hpp>
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
#include "util/Log.hpp"
+namespace {
+template <int Level, int Name>
+class timeval {
+ public:
+ // This is not an instance of the template, but of the system provided type
+ // to be written to the socket API.
+#if defined(_WINDOWS)
+ using value_type = DWORD;
+#else
+ using value_type = ::timeval;
+#endif
+
+ private:
+ value_type value_{};
+
+ public:
+ timeval() {}
+
+ explicit timeval(value_type v) : value_(v) {}
+
+ timeval &operator=(value_type v) {
+ value_ = v;
+ return *this;
+ }
+
+ value_type value() const { return value_; }
+
+ template <typename Protocol>
+ int level(const Protocol &) const {
+ return Level;
+ }
+
+ template <typename Protocol>
+ int name(const Protocol &) const {
+ return Name;
+ }
+
+ template <typename Protocol>
+ value_type *data(const Protocol &) {
+ return &value_;
+ }
+
+ template <typename Protocol>
+ const value_type *data(const Protocol &) const {
+ return &value_;
+ }
+
+ template <typename Protocol>
+ std::size_t size(const Protocol &) const {
+ return sizeof(value_);
+ }
+
+ template <typename Protocol>
+ void resize(const Protocol &, std::size_t s) {
+ if (s != sizeof(value_)) {
+ throw std::length_error("timeval socket option resize");
+ }
+ }
+};
+
+// Asio doesn't support these socket options directly, but every major platform
+// does. Timeout on IO socket operations are supported by the platform directly.
+// This means We can all receive without needing to use the timeout interface -
+// and more importantly, we can send while holding to per-operation time
+// constraints and without blocking indefinitely.
+//
+// The default timeout is infinite, or by setting the socket option to null,
+// which I won't provide - just don't construct a TcpConn with send and
+// receieve timeouts.
+typedef timeval<SOL_SOCKET, SO_SNDTIMEO> send_timeout;
+typedef timeval<SOL_SOCKET, SO_RCVTIMEO> receive_timeout;
+} // namespace
+
namespace apache {
namespace geode {
namespace client {
+TcpConn::TcpConn(const std::string ipaddr,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool)
+ : TcpConn{
+ ipaddr.substr(0, ipaddr.find(':')),
+ static_cast<uint16_t>(std::stoi(ipaddr.substr(ipaddr.find(':') + 1))),
+ connect_timeout, maxBuffSizePool} {}
-const size_t TcpConn::kChunkSize = TcpConn::getDefaultChunkSize();
+TcpConn::TcpConn(const std::string host, uint16_t port,
+ std::chrono::microseconds timeout, int32_t maxBuffSizePool)
+ : socket_{io_context_} {
+ auto beforeResolvePoint = std::chrono::system_clock::now();
+ auto results = resolve(host, port, timeout);
+ auto elapsedTime = std::chrono::duration<double, std::micro>(
+ std::chrono::system_clock::now() - beforeResolvePoint);
-void TcpConn::clearNagle(ACE_HANDLE sock) {
- int32_t val = 1;
+ // We must connect first so we have a valid file descriptor to set options
+ // on.
+ auto connectTimeout = std::chrono::duration_cast<std::chrono::microseconds>(
+ timeout - elapsedTime);
+ connect(results, connectTimeout);
- if (0 != ACE_OS::setsockopt(sock, IPPROTO_TCP, 1,
- reinterpret_cast<const char*>(&val),
- sizeof(val))) {
- int32_t lastError = ACE_OS::last_error();
- LOGERROR("Failed to set TCP_NODELAY on socket. Errno: %d: %s", lastError,
- ACE_OS::strerror(lastError));
- }
+ socket_.set_option(::boost::asio::ip::tcp::no_delay{true});
+ socket_.set_option(
+ ::boost::asio::socket_base::send_buffer_size{maxBuffSizePool});
+ socket_.set_option(
+ ::boost::asio::socket_base::receive_buffer_size{maxBuffSizePool});
}
-int32_t TcpConn::maxSize(ACE_HANDLE sock, int32_t flag, int32_t size) {
- int32_t val = 0;
-
- int32_t inc = 32120;
- val = size - (3 * inc);
- if (val < 0) val = 0;
- if (size == 0) size = maxBuffSizePool_;
- int32_t red = 0;
- int32_t lastRed = -1;
- while (lastRed != red) {
- lastRed = red;
- val += inc;
- if (0 != ACE_OS::setsockopt(sock, SOL_SOCKET, flag,
- reinterpret_cast<const char*>(&val),
- sizeof(val))) {
- int32_t lastError = ACE_OS::last_error();
- LOGERROR("Failed to set socket options. Errno: %d : %s ", lastError,
- ACE_OS::strerror(lastError));
- }
- int plen = sizeof(val);
- if (0 != ACE_OS::getsockopt(sock, SOL_SOCKET, flag,
- reinterpret_cast<char*>(&val), &plen)) {
- int32_t lastError = ACE_OS::last_error();
- LOGERROR(
- "Failed to get buffer size for flag %d on socket. Errno: %d : %s",
- flag, lastError, ACE_OS::strerror(lastError));
- }
-#ifdef _LINUX
- val /= 2;
-#endif
- if ((val >= maxBuffSizePool_) || (val >= size)) continue;
- red = val;
- }
- return val;
-}
-
-void TcpConn::createSocket(ACE_HANDLE sock) {
- LOGDEBUG("Creating plain socket stream");
- stream_ = std::unique_ptr<ACE_SOCK_Stream>(new ACE_SOCK_Stream(sock));
-}
-
-void TcpConn::init() {
-#ifdef WITH_IPV6
- ACE_HANDLE sock = ACE_OS::socket(inetAddress_.get_type(), SOCK_STREAM, 0);
+TcpConn::TcpConn(const std::string ipaddr,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool, std::chrono::microseconds send_time,
+ std::chrono::microseconds receive_time)
+ : TcpConn{ipaddr, connect_timeout, maxBuffSizePool} {
+#if defined(_WINDOWS)
+ socket_.set_option(::send_timeout{static_cast<DWORD>(send_time.count())});
+ socket_.set_option(
+ ::receive_timeout{static_cast<DWORD>(receive_time.count())});
#else
- ACE_HANDLE sock = ACE_OS::socket(AF_INET, SOCK_STREAM, 0);
+
+ auto send_seconds =
+ std::chrono::duration_cast<std::chrono::seconds>(send_time);
+ auto send_microseconds =
+ send_time % std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::seconds{1});
+ socket_.set_option(
+ ::send_timeout{{static_cast<int>(send_seconds.count()),
+ static_cast<int>(send_microseconds.count())}});
+
+ auto receive_seconds =
+ std::chrono::duration_cast<std::chrono::seconds>(receive_time);
+ auto receive_microseconds =
+ receive_time % std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::seconds{1});
+ socket_.set_option(
+ ::receive_timeout{{static_cast<int>(receive_seconds.count()),
+ static_cast<int>(receive_microseconds.count())}});
#endif
- if (sock == ACE_INVALID_HANDLE) {
- int32_t lastError = ACE_OS::last_error();
- LOGERROR("Failed to create socket. Errno: %d: %s", lastError,
- ACE_OS::strerror(lastError));
- char msg[256];
- std::snprintf(msg, 256, "TcpConn::connect failed with errno: %d: %s",
- lastError, ACE_OS::strerror(lastError));
- throw GeodeIOException(msg);
- }
-
- clearNagle(sock);
-
- int32_t readSize = 0;
- int32_t writeSize = 0;
- int32_t originalReadSize = readSize;
- readSize = maxSize(sock, SO_SNDBUF, readSize);
- if (originalReadSize != readSize) {
- // This should get logged once at startup and again only if it changes
- LOGFINEST("Using socket send buffer size of %d.", readSize);
- }
- int32_t originalWriteSize = writeSize;
- writeSize = maxSize(sock, SO_RCVBUF, writeSize);
- if (originalWriteSize != writeSize) {
- // This should get logged once at startup and again only if it changes
- LOGFINEST("Using socket receive buffer size of %d.", writeSize);
- }
-
- createSocket(sock);
-
- connect();
}
-TcpConn::TcpConn(const std::string& address,
- std::chrono::microseconds waitSeconds, int32_t maxBuffSizePool)
- : stream_(nullptr),
- maxBuffSizePool_(maxBuffSizePool),
- inetAddress_(address.c_str()),
- endpoint_(address),
- timeout_(waitSeconds) {}
+TcpConn::~TcpConn() {
+ std::stringstream ss;
-TcpConn::TcpConn(const std::string& hostname, uint16_t port,
- std::chrono::microseconds waitSeconds, int32_t maxBuffSizePool)
- : stream_(nullptr),
- maxBuffSizePool_(maxBuffSizePool),
- inetAddress_(port, hostname.c_str()),
- endpoint_(hostname + ":" + std::to_string(port)),
- timeout_(waitSeconds) {}
+ try {
+ ss << "Disconnected " << socket_.local_endpoint() << " -> "
+ << socket_.remote_endpoint();
-void TcpConn::connect() {
- using apache::geode::internal::chrono::duration::to_string;
+ socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
- ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe
+ } catch (...) {
+ ss = std::stringstream{};
- LOGFINER("Connecting plain socket stream to " + endpoint_ + " waiting " +
- to_string(timeout_));
+ ss << "Closed socket " << socket_.local_endpoint();
+ }
- const ACE_Time_Value aceTimeout(timeout_);
- const auto timeout =
- (timeout_ > std::chrono::microseconds::zero()) ? &aceTimeout : nullptr;
- if (ACE_SOCK_Connector{}.connect(*stream_, inetAddress_, timeout) == -1) {
- const auto lastError = ACE_OS::last_error();
- if (lastError == ETIME || lastError == ETIMEDOUT) {
- throw TimeoutException(
- "TcpConn::connect Attempt to connect timed out after " +
- to_string(timeout_) + ".");
+ socket_.close();
+
+ LOGFINE(ss.str());
+}
+
+size_t TcpConn::receive(char *buff, const size_t len,
+ std::chrono::milliseconds timeout) {
+ std::stringstream ss;
+ ss << "Receiving " << len << " bytes from " << socket_.remote_endpoint()
+ << " -> " << socket_.local_endpoint();
+ LOGDEBUG(ss.str());
+ return receive(buff, len, timeout, true);
+}
+
+size_t TcpConn::receive_nothrowiftimeout(char *buff, const size_t len,
+ std::chrono::milliseconds timeout) {
+ std::stringstream ss;
+ ss << "Receiving an unknown number of bytes from "
+ << socket_.remote_endpoint() << " -> " << socket_.local_endpoint();
+ LOGDEBUG(ss.str());
+ return receive(buff, len, timeout, false);
+}
+
+size_t TcpConn::receive(char *buff, const size_t len,
+ std::chrono::milliseconds timeout,
+ bool throwTimeoutException) {
+ boost::optional<boost::system::error_code> read_result;
+ std::size_t bytes_read = 0;
+
+ auto beforeResolvePoint = std::chrono::system_clock::now();
+
+ try {
+ prepareAsyncRead(buff, len, read_result, bytes_read);
+ io_context_.restart();
+ io_context_.run_for(timeout);
+ } catch (...) {
+ LOGDEBUG("Throwing an unexpected read exception");
+ throw;
+ }
+
+ if (read_result && *read_result) {
+ LOGDEBUG("Throwing a read exception: %s", read_result->message().c_str());
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{*read_result};
+ }
+
+ if (bytes_read == 0) {
+ auto elapsedTime = std::chrono::duration<double, std::micro>(
+ std::chrono::system_clock::now() - beforeResolvePoint);
+ if (elapsedTime < timeout) {
+ LOGDEBUG("Throwing an IO exception");
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{boost::asio::error::broken_pipe};
+ } else {
+ LOGDEBUG("Throwing an eof exception");
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{boost::asio::error::eof};
}
- close();
- throw GeodeIOException("TcpConn::connect failed with errno: " +
- ACE_errno_to_string(lastError));
}
- if (stream_->enable(ACE_NONBLOCK)) {
- LOGINFO("TcpConn::NONBLOCK: " + ACE_errno_to_string(ACE_OS::last_error()));
+ if (bytes_read != len && throwTimeoutException) {
+ LOGDEBUG("Throwing a read timeout exception");
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{boost::asio::error::operation_aborted};
}
+
+ return bytes_read;
}
-void TcpConn::close() {
- if (stream_) {
- stream_->close();
- stream_ = nullptr;
+size_t TcpConn::send(const char *buff, const size_t len,
+ std::chrono::milliseconds timeout) {
+ std::stringstream ss;
+ ss << "Sending " << len << " bytes from " << socket_.local_endpoint()
+ << " -> " << socket_.remote_endpoint();
+ LOGDEBUG(ss.str());
+
+ boost::optional<boost::system::error_code> write_result;
+ std::size_t bytes_written = 0;
+
+ try {
+ prepareAsyncWrite(buff, len, write_result, bytes_written);
+ io_context_.restart();
+ io_context_.run_for(timeout);
+ } catch (...) {
+ LOGDEBUG("Throwing an unexpected write exception");
+ throw;
}
-}
-size_t TcpConn::receive(char* buff, size_t len,
- std::chrono::microseconds waitSeconds) {
- return socketOp(SOCK_READ, buff, len, waitSeconds);
-}
-
-size_t TcpConn::send(const char* buff, size_t len,
- std::chrono::microseconds waitSeconds) {
- return socketOp(SOCK_WRITE, const_cast<char*>(buff), len, waitSeconds);
-}
-
-size_t TcpConn::socketOp(TcpConn::SockOp op, char* buff, size_t len,
- std::chrono::microseconds waitDuration) {
- {
- ACE_Time_Value waitTime(waitDuration);
- auto endTime = std::chrono::steady_clock::now() + waitDuration;
- size_t readLen = 0;
- ssize_t retVal;
- bool errnoSet = false;
-
- size_t totalsend = 0;
- while (len > 0 && waitTime > ACE_Time_Value::zero) {
- size_t sendlen;
- if (len > kChunkSize) {
- sendlen = kChunkSize;
- len -= kChunkSize;
- } else {
- sendlen = len;
- len = 0;
- }
- do {
- retVal = doOperation(op, buff, sendlen, waitTime, readLen);
- sendlen -= readLen;
- totalsend += readLen;
- if (retVal < 0) {
- int32_t lastError = ACE_OS::last_error();
- if (lastError == EAGAIN) {
- std::this_thread::sleep_for(std::chrono::microseconds(100));
- } else {
- errnoSet = true;
- break;
- }
- } else if (retVal == 0 && readLen == 0) {
- ACE_OS::last_error(EPIPE);
- errnoSet = true;
- break;
- }
-
- buff += readLen;
- if (sendlen == 0) break;
- waitTime = endTime - std::chrono::steady_clock::now();
- if (waitTime <= ACE_Time_Value::zero) break;
- } while (sendlen > 0);
- if (errnoSet) break;
- }
-
- if (len > 0 && !errnoSet) {
- ACE_OS::last_error(ETIME);
- }
-
- return totalsend;
+ if (write_result && *write_result) {
+ LOGDEBUG("Throwing a write exception. %s", write_result->message().c_str());
+ throw boost::system::system_error{*write_result};
}
-}
-ssize_t TcpConn::doOperation(const TcpConn::SockOp& op, void* buff,
- size_t sendlen, ACE_Time_Value& waitTime,
- size_t& readLen) const {
- if (op == SOCK_READ) {
- return stream_->recv_n(buff, sendlen, &waitTime, &readLen);
- } else {
- return stream_->send_n(buff, sendlen, &waitTime, &readLen);
+
+ if (bytes_written != len) {
+ LOGDEBUG("Throwing a write timeout exception");
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{boost::asio::error::operation_aborted};
}
+
+ return bytes_written;
}
// Return the local port for this TCP connection.
-uint16_t TcpConn::getPort() {
- ACE_INET_Addr localAddr;
- stream_->get_local_addr(localAddr);
- return localAddr.get_port_number();
-}
+uint16_t TcpConn::getPort() { return socket_.local_endpoint().port(); }
-size_t TcpConn::getDefaultChunkSize() {
- //
- auto pageSize = boost::interprocess::mapped_region::get_page_size();
- if (pageSize > 16000000) {
- return 16000000;
- } else if (pageSize > 0) {
- return pageSize + (16000000 / pageSize) * pageSize;
+void TcpConn::connect(boost::asio::ip::tcp::resolver::results_type r,
+ std::chrono::microseconds timeout) {
+ boost::optional<boost::system::error_code> connect_result;
+
+ try {
+ // We must connect first so we have a valid file descriptor to set
+ // options on.
+ boost::asio::async_connect(
+ socket_, r,
+ [&connect_result](const boost::system::error_code &ec,
+ const boost::asio::ip::tcp::endpoint) {
+ connect_result = ec;
+ });
+
+ io_context_.restart();
+ io_context_.run_for(timeout);
+ } catch (...) {
+ LOGDEBUG("Throwing an unexpected connect exception");
+ throw;
}
- return 16000000;
+ if (connect_result && *connect_result) {
+ LOGDEBUG("Throwing a connect exception: %s",
+ connect_result->message().c_str());
+ throw boost::system::system_error{*connect_result};
+ }
+
+ if (!connect_result) {
+ LOGDEBUG("Throwing a connect timeout exception");
+ throw boost::system::system_error{boost::asio::error::operation_aborted};
+ }
+
+ std::stringstream ss;
+ ss << "Connected " << socket_.local_endpoint() << " -> "
+ << socket_.remote_endpoint();
+ LOGDEBUG(ss.str());
+}
+
+boost::asio::ip::tcp::resolver::results_type TcpConn::resolve(
+ const std::string host, uint16_t port, std::chrono::microseconds timeout) {
+ boost::optional<boost::system::error_code> resolve_result;
+ boost::asio::ip::tcp::resolver::results_type results;
+
+ try {
+ boost::asio::ip::tcp::resolver resolver(io_context_);
+ resolver.async_resolve(host, std::to_string(port),
+ [&resolve_result, &results](
+ const boost::system::error_code &ec,
+ boost::asio::ip::tcp::resolver::results_type r) {
+ if (ec) {
+ resolve_result = ec;
+ } else {
+ resolve_result = boost::system::error_code{};
+ results = r;
+ }
+ });
+
+ io_context_.restart();
+ io_context_.run_for(timeout);
+ } catch (...) {
+ LOGDEBUG("Throwing an unexpected resolve exception");
+ throw;
+ }
+
+ if (resolve_result && *resolve_result) {
+ LOGDEBUG("Throwing a resolve exception: %s",
+ resolve_result->message().c_str());
+ throw boost::system::system_error{*resolve_result};
+ }
+
+ if (!resolve_result) {
+ LOGDEBUG("Throwing a resolve timeout exception");
+ socket_.cancel();
+ // Get the abort
+ io_context_.restart();
+ io_context_.run();
+ throw boost::system::system_error{boost::asio::error::operation_aborted};
+ }
+
+ return results;
+}
+
+void TcpConn::prepareAsyncRead(
+ char *buff, size_t len,
+ boost::optional<boost::system::error_code> &read_result,
+ std::size_t &bytes_read) {
+ boost::asio::async_read(
+ socket_, boost::asio::buffer(buff, len),
+ [&read_result, &bytes_read](const boost::system::error_code &ec,
+ const size_t n) {
+ bytes_read = n;
+
+ // EOF itself occurs when there is no data available on the socket at
+ // the time of the read. It may simply imply data has yet to arrive.
+ // Do nothing. Defer to timeout rather than assume a broken
+ // connection.
+ if (ec != boost::asio::error::eof &&
+ ec != boost::asio::error::try_again) {
+ read_result = ec;
+ return;
+ }
+ });
+}
+
+void TcpConn::prepareAsyncWrite(
+ const char *buff, size_t len,
+ boost::optional<boost::system::error_code> &write_result,
+ std::size_t &bytes_written) {
+ boost::asio::async_write(
+ socket_, boost::asio::buffer(buff, len),
+ [&write_result, &bytes_written](const boost::system::error_code &ec,
+ const size_t n) {
+ bytes_written = n;
+
+ if (ec != boost::asio::error::eof &&
+ ec != boost::asio::error::try_again) {
+ write_result = ec;
+ return;
+ }
+ });
}
} // namespace client
diff --git a/cppcache/src/TcpConn.hpp b/cppcache/src/TcpConn.hpp
index ed7d406..9113dd4 100644
--- a/cppcache/src/TcpConn.hpp
+++ b/cppcache/src/TcpConn.hpp
@@ -20,10 +20,8 @@
#ifndef GEODE_TCPCONN_H_
#define GEODE_TCPCONN_H_
-#include <chrono>
-#include <memory>
-
-#include <ace/SOCK_Stream.h>
+#include <boost/asio.hpp>
+#include <boost/optional.hpp>
#include <geode/internal/geode_globals.hpp>
@@ -32,62 +30,50 @@
namespace apache {
namespace geode {
namespace client {
-
-inline std::string ACE_errno_to_string(decltype(ACE_OS::last_error()) error) {
- return std::to_string(error) + ": " + ACE_OS::strerror(error);
-}
-
class TcpConn : public Connector {
- private:
- std::unique_ptr<ACE_SOCK_Stream> stream_;
- const int32_t maxBuffSizePool_;
+ size_t receive(char*, size_t, std::chrono::milliseconds) override;
+ size_t receive_nothrowiftimeout(char*, size_t,
+ std::chrono::milliseconds) override;
+ size_t send(const char*, size_t, std::chrono::milliseconds) override;
- /**
- * Attempt to set chunk size to nearest OS page size for perf improvement
- */
- static size_t getDefaultChunkSize();
+ uint16_t getPort() override final;
protected:
- ACE_INET_Addr inetAddress_;
- std::string endpoint_;
- std::chrono::microseconds timeout_;
- static const size_t kChunkSize;
+ boost::asio::io_context io_context_;
+ boost::asio::ip::tcp::socket socket_;
- enum SockOp { SOCK_READ, SOCK_WRITE };
+ boost::asio::ip::tcp::resolver::results_type resolve(
+ const std::string hostname, uint16_t port,
+ std::chrono::microseconds timeout);
- void clearNagle(ACE_HANDLE sock);
- int32_t maxSize(ACE_HANDLE sock, int32_t flag, int32_t size);
+ void connect(boost::asio::ip::tcp::resolver::results_type r,
+ std::chrono::microseconds connect_timeout);
- virtual size_t socketOp(SockOp op, char* buff, size_t len,
- std::chrono::microseconds waitDuration);
+ size_t receive(char*, size_t, std::chrono::milliseconds,
+ bool throwTimeoutException);
- virtual void createSocket(ACE_HANDLE sock);
+ virtual void prepareAsyncRead(
+ char* buff, size_t len,
+ boost::optional<boost::system::error_code>& read_result,
+ std::size_t& bytes_read);
- virtual ssize_t doOperation(const SockOp& op, void* buff, size_t sendlen,
- ACE_Time_Value& waitTime, size_t& readLen) const;
+ virtual void prepareAsyncWrite(
+ const char* buff, size_t len,
+ boost::optional<boost::system::error_code>& write_result,
+ std::size_t& bytes_written);
public:
- TcpConn(const std::string& hostname, uint16_t port,
- std::chrono::microseconds waitSeconds, int32_t maxBuffSizePool);
-
- TcpConn(const std::string& address, std::chrono::microseconds waitSeconds,
+ TcpConn(const std::string ipaddr, std::chrono::microseconds connect_timeout,
int32_t maxBuffSizePool);
- ~TcpConn() override {}
+ TcpConn(const std::string hostname, uint16_t port,
+ std::chrono::microseconds connect_timeout, int32_t maxBuffSizePool);
- void close() override;
+ TcpConn(const std::string ipaddr, std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool, std::chrono::microseconds send_timeout,
+ std::chrono::microseconds receive_timeout);
- void init() override;
-
- virtual void connect();
-
- size_t receive(char* buff, size_t len,
- std::chrono::microseconds waitSeconds) override;
-
- size_t send(const char* buff, size_t len,
- std::chrono::microseconds waitSeconds) override;
-
- virtual uint16_t getPort() override;
+ ~TcpConn() override;
};
} // namespace client
diff --git a/cppcache/src/TcpSslConn.cpp b/cppcache/src/TcpSslConn.cpp
index 8ede9a3..956af7a 100644
--- a/cppcache/src/TcpSslConn.cpp
+++ b/cppcache/src/TcpSslConn.cpp
@@ -17,9 +17,15 @@
#include "TcpSslConn.hpp"
-#include <memory>
+#include <openssl/err.h>
+#include <openssl/x509.h>
-#include <ace/SSL/SSL_SOCK_Connector.h>
+#include <chrono>
+#include <iostream>
+#include <thread>
+
+#include <boost/exception/diagnostic_information.hpp>
+#include <boost/optional.hpp>
#include <geode/ExceptionTypes.hpp>
#include <geode/SystemProperties.hpp>
@@ -29,105 +35,167 @@
namespace apache {
namespace geode {
namespace client {
-std::atomic_flag TcpSslConn::initialized_ = ATOMIC_FLAG_INIT;
-void TcpSslConn::createSocket(ACE_HANDLE sock) {
- LOGDEBUG("Creating SSL socket stream");
- stream_ = std::unique_ptr<ACE_SSL_SOCK_Stream>(new ACE_SSL_SOCK_Stream());
- stream_->set_handle(sock);
+TcpSslConn::TcpSslConn(const std::string& hostname, uint16_t,
+ const std::string& sniProxyHostname,
+ uint16_t sniProxyPort,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool, const std::string& pubkeyfile,
+ const std::string& privkeyfile,
+ const std::string& pemPassword)
+ : TcpConn{sniProxyHostname, sniProxyPort, connect_timeout, maxBuffSizePool},
+ ssl_context_{boost::asio::ssl::context::sslv23_client},
+ strand_(io_context_) {
+ init(pubkeyfile, privkeyfile, pemPassword, hostname);
}
-void TcpSslConn::connect() {
- using apache::geode::internal::chrono::duration::to_string;
-
- ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe
-
- LOGFINER(std::string("Connecting SSL socket stream to ") +
- inetAddress_.get_host_name() + ":" +
- std::to_string(inetAddress_.get_port_number()) + " waiting " +
- to_string(timeout_));
-
- if (!sniHostname_.empty()) {
- SSL_set_tlsext_host_name(stream_->ssl(), sniHostname_.c_str());
- }
-
- ACE_SSL_SOCK_Connector conn;
- ACE_Time_Value actTimeout(timeout_);
- if (conn.connect(*stream_, inetAddress_,
- timeout_ > std::chrono::microseconds::zero()
- ? &actTimeout
- : nullptr) == -1) {
- const auto lastError = ACE_OS::last_error();
- if (lastError == ETIME || lastError == ETIMEDOUT) {
- throw TimeoutException(
- "TcpSslConn::connect Attempt to connect timed out after " +
- to_string(timeout_) + ".");
- }
- close();
- throw GeodeIOException("TcpSslConn::connect failed with errno: " +
- ACE_errno_to_string(lastError));
- }
+TcpSslConn::TcpSslConn(const std::string& hostname, uint16_t port,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool, const std::string& pubkeyfile,
+ const std::string& privkeyfile,
+ const std::string& pemPassword)
+ : TcpConn{hostname, port, connect_timeout, maxBuffSizePool},
+ ssl_context_{boost::asio::ssl::context::sslv23_client},
+ strand_(io_context_) {
+ init(pubkeyfile, privkeyfile, pemPassword);
}
-void TcpSslConn::close() {
- if (stream_) {
- stream_->close();
- stream_ = nullptr;
- }
-}
+TcpSslConn::TcpSslConn(const std::string& ipaddr,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool, const std::string& pubkeyfile,
+ const std::string& privkeyfile,
+ const std::string& pemPassword)
+ : TcpSslConn{
+ ipaddr.substr(0, ipaddr.find(':')),
+ static_cast<uint16_t>(std::stoi(ipaddr.substr(ipaddr.find(':') + 1))),
+ connect_timeout,
+ maxBuffSizePool,
+ pubkeyfile,
+ privkeyfile,
+ pemPassword} {}
-uint16_t TcpSslConn::getPort() {
- ACE_INET_Addr localAddr;
- stream_->get_local_addr(localAddr);
- return localAddr.get_port_number();
-}
+TcpSslConn::TcpSslConn(const std::string& ipaddr,
+ std::chrono::microseconds connect_timeout,
+ int32_t maxBuffSizePool,
+ const std::string& sniProxyHostname,
+ uint16_t sniProxyPort, const std::string& pubkeyfile,
+ const std::string& privkeyfile,
+ const std::string& pemPassword)
+ : TcpSslConn{
+ ipaddr.substr(0, ipaddr.find(':')),
+ static_cast<uint16_t>(std::stoi(ipaddr.substr(ipaddr.find(':') + 1))),
+ sniProxyHostname,
+ sniProxyPort,
+ connect_timeout,
+ maxBuffSizePool,
+ pubkeyfile,
+ privkeyfile,
+ pemPassword} {}
-static int pem_passwd_cb(char* buf, int size, int /*rwflag*/, void* passwd) {
- strncpy(buf, reinterpret_cast<char*>(passwd), size);
- buf[size - 1] = '\0';
- return static_cast<int>(strlen(buf));
-}
+void TcpSslConn::init(const std::string& pubkeyfile,
+ const std::string& privkeyfile,
+ const std::string& pemPassword,
+ const std::string& sniHostname) {
+ // Most of the SSL configuration provided *through* Asio is on the context.
+ // This configuration is copied into each SSL instance upon construction.
+ // That means you need to get your configuration in order before you
+ // construct the stream and connect the socket.
+ LOGDEBUG(
+ "*** TcpSslConn init, pubkeyfile = %s, pemPassword = %s, sniHostname = "
+ "%s",
+ pubkeyfile.c_str(), pemPassword.c_str(), sniHostname.c_str());
-void TcpSslConn::initSsl() {
- if (!TcpSslConn::initialized_.test_and_set()) {
- auto sslContext = ACE_SSL_Context::instance();
+ try {
+ ssl_context_.set_verify_mode(boost::asio::ssl::verify_peer);
+ ssl_context_.load_verify_file(pubkeyfile);
- SSL_CTX_set_cipher_list(sslContext->context(), "DEFAULT");
- sslContext->set_mode(ACE_SSL_Context::SSLv23_client);
- sslContext->set_verify_peer();
- if (sslContext->load_trusted_ca(trustStoreFile_.c_str()) != 0) {
- throw SslException("Failed to read SSL trust store.");
+ ssl_context_.set_password_callback(
+ [pemPassword](std::size_t /*max_length*/,
+ boost::asio::ssl::context::password_purpose /*purpose*/) {
+ return pemPassword;
+ });
+
+ if (!privkeyfile.empty()) {
+ ssl_context_.use_certificate_chain_file(privkeyfile);
+ ssl_context_.use_private_key_file(
+ privkeyfile, boost::asio::ssl::context::file_format::pem);
}
- if (!password_.empty()) {
- SSL_CTX_set_default_passwd_cb(sslContext->context(), pem_passwd_cb);
- SSL_CTX_set_default_passwd_cb_userdata(
- sslContext->context(), const_cast<char*>(password_.c_str()));
- }
+ auto stream = std::unique_ptr<ssl_stream_type>(
+ new ssl_stream_type{socket_, ssl_context_});
- if (!privateKeyFile_.empty()) {
- if (sslContext->certificate(privateKeyFile_.c_str()) != 0) {
- throw SslException("Failed to read SSL certificate.");
- }
- if (sslContext->private_key(privateKeyFile_.c_str()) != 0) {
- throw SslException("Invalid SSL keystore password.");
- }
- if (SSL_CTX_use_certificate_chain_file(sslContext->context(),
- privateKeyFile_.c_str()) <= 0) {
- throw SslException("Failed to read SSL certificate chain.");
- }
- }
+ SSL_set_tlsext_host_name(stream->native_handle(), sniHostname.c_str());
+
+ stream->handshake(ssl_stream_type::client);
+
+ std::stringstream ss;
+ ss << "Setup SSL " << socket_.local_endpoint() << " -> "
+ << socket_.remote_endpoint();
+ LOGINFO(ss.str());
+
+ ss.clear();
+ ss << "SNI hostname: " << sniHostname;
+ LOGINFO(ss.str());
+
+ socket_stream_ = std::move(stream);
+ } catch (const boost::exception& ex) {
+ // error handling
+ std::string info = boost::diagnostic_information(ex);
+ LOGDEBUG("caught boost exception: %s", info.c_str());
+ throw apache::geode::client::SslException(info.c_str());
}
}
-ssize_t TcpSslConn::doOperation(const TcpConn::SockOp& op, void* buff,
- size_t sendlen, ACE_Time_Value& waitTime,
- size_t& readLen) const {
- if (op == SOCK_READ) {
- return stream_->recv_n(buff, sendlen, &waitTime, &readLen);
- } else {
- return stream_->send_n(buff, sendlen, &waitTime, &readLen);
+TcpSslConn::~TcpSslConn() {
+ std::stringstream ss;
+ ss << "Teardown SSL " << socket_.local_endpoint() << " -> ";
+ try {
+ ss << socket_.remote_endpoint();
+ } catch (...) {
}
+ LOGFINE(ss.str());
+}
+
+void TcpSslConn::prepareAsyncRead(
+ char* buff, size_t len,
+ boost::optional<boost::system::error_code>& read_result,
+ std::size_t& bytes_read) {
+ boost::asio::async_read(
+ *socket_stream_, boost::asio::buffer(buff, len),
+ boost::asio::bind_executor(
+ strand_, [&read_result, &bytes_read](
+ const boost::system::error_code& ec, const size_t n) {
+ bytes_read = n;
+
+ // EOF itself occurs when there is no data available on the socket
+ // at the time of the read. It may simply imply data has yet to
+ // arrive. Do nothing. Defer to timeout rather than assume a broken
+ // connection.
+ if (ec != boost::asio::error::eof &&
+ ec != boost::asio::error::try_again) {
+ read_result = ec;
+ return;
+ }
+ }));
+}
+
+void TcpSslConn::prepareAsyncWrite(
+ const char* buff, size_t len,
+ boost::optional<boost::system::error_code>& write_result,
+ std::size_t& bytes_written) {
+ boost::asio::async_write(
+ *socket_stream_, boost::asio::buffer(buff, len),
+ boost::asio::bind_executor(
+ strand_, [&write_result, &bytes_written](
+ const boost::system::error_code& ec, const size_t n) {
+ bytes_written = n;
+
+ if (ec != boost::asio::error::eof &&
+ ec != boost::asio::error::try_again) {
+ write_result = ec;
+ return;
+ }
+ }));
}
} // namespace client
diff --git a/cppcache/src/TcpSslConn.hpp b/cppcache/src/TcpSslConn.hpp
index 6a543b1..adf47f2 100644
--- a/cppcache/src/TcpSslConn.hpp
+++ b/cppcache/src/TcpSslConn.hpp
@@ -20,23 +20,7 @@
#ifndef GEODE_TCPSSLCONN_H_
#define GEODE_TCPSSLCONN_H_
-#include <atomic>
-#include <chrono>
-#include <string>
-
-#if defined(_WIN32)
-#pragma warning(push)
-#pragma warning(disable : 4311)
-#pragma warning(disable : 4302)
-#endif
-
-#pragma pack(push)
-#include <ace/SSL/SSL_SOCK_Stream.h>
-#pragma pack(pop)
-
-#if defined(_WIN32)
-#pragma warning(pop)
-#endif
+#include <boost/asio/ssl.hpp>
#include "TcpConn.hpp"
@@ -45,63 +29,51 @@
namespace client {
class TcpSslConn : public TcpConn {
- private:
- static std::atomic_flag initialized_;
- const std::string trustStoreFile_;
- const std::string privateKeyFile_;
- const std::string password_;
- std::string sniHostname_;
- std::unique_ptr<ACE_SSL_SOCK_Stream> stream_;
-
protected:
- void createSocket(ACE_HANDLE sock) override;
+ using ssl_stream_type =
+ boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>;
- ssize_t doOperation(const SockOp& op, void* buff, size_t sendlen,
- ACE_Time_Value& waitTime, size_t& readLen) const override;
+ boost::asio::ssl::context ssl_context_;
+ std::unique_ptr<ssl_stream_type> socket_stream_;
+ boost::asio::io_context::strand strand_;
- void initSsl();
+ void prepareAsyncRead(char* buff, size_t len,
+ boost::optional<boost::system::error_code>& read_result,
+ std::size_t& bytes_read) override;
+
+ void prepareAsyncWrite(
+ const char* buff, size_t len,
+ boost::optional<boost::system::error_code>& write_result,
+ std::size_t& bytes_written) override;
public:
- TcpSslConn(const std::string& ipaddr, std::chrono::microseconds waitSeconds,
- int32_t maxBuffSizePool, const std::string& sniProxyHostname,
- uint16_t sniProxyPort, std::string publicKeyFile,
- std::string privateKeyFile, std::string password)
- : TcpConn(sniProxyHostname, sniProxyPort, waitSeconds, maxBuffSizePool),
- trustStoreFile_(std::move(publicKeyFile)),
- privateKeyFile_(std::move(privateKeyFile)),
- password_(std::move(password)),
- sniHostname_(ipaddr.substr(0, ipaddr.find(':'))) {
- initSsl();
- }
+ TcpSslConn(const std::string& hostname, uint16_t port,
+ const std::string& sniProxyHostname, uint16_t sniProxyPort,
+ std::chrono::microseconds connect_timeout, int32_t maxBuffSizePool,
+ const std::string& pubkeyfile, const std::string& privkeyfile,
+ const std::string& pemPassword);
TcpSslConn(const std::string& hostname, uint16_t port,
std::chrono::microseconds connect_timeout, int32_t maxBuffSizePool,
- const std::string& publicKeyFile,
- const std::string& privateKeyFile, const std::string& password)
- : TcpConn(hostname.c_str(), port, connect_timeout, maxBuffSizePool),
- trustStoreFile_(std::move(publicKeyFile)),
- privateKeyFile_(std::move(privateKeyFile)),
- password_(std::move(password)) {
- initSsl();
- }
+ const std::string& pubkeyfile, const std::string& privkeyfile,
+ const std::string& pemPassword);
- TcpSslConn(const std::string& address, std::chrono::microseconds waitSeconds,
- int32_t maxBuffSizePool, std::string publicKeyFile,
- std::string privateKeyFile, std::string password)
- : TcpConn(address, waitSeconds, maxBuffSizePool),
- trustStoreFile_(std::move(publicKeyFile)),
- privateKeyFile_(std::move(privateKeyFile)),
- password_(std::move(password)) {
- initSsl();
- }
+ TcpSslConn(const std::string& ipaddr,
+ std::chrono::microseconds connect_timeout, int32_t maxBuffSizePool,
+ const std::string& pubkeyfile, const std::string& privkeyfile,
+ const std::string& pemPassword);
- virtual ~TcpSslConn() noexcept override = default;
+ TcpSslConn(const std::string& ipaddr, std::chrono::microseconds waitSeconds,
+ int32_t maxBuffSizePool, const std::string& sniProxyHostname,
+ uint16_t sniProxyPort, const std::string& publicKeyFile,
+ const std::string& privateKeyFile, const std::string& password);
- void close() override;
+ ~TcpSslConn() override;
- void connect() override;
-
- uint16_t getPort() override;
+ private:
+ void init(const std::string& pubkeyfile, const std::string& privkeyfile,
+ const std::string& pemPassword,
+ const std::string& sniHostname = "");
};
} // namespace client
} // namespace geode
diff --git a/cppcache/src/TcrConnection.cpp b/cppcache/src/TcrConnection.cpp
index 3c598f7..602c084 100644
--- a/cppcache/src/TcrConnection.cpp
+++ b/cppcache/src/TcrConnection.cpp
@@ -25,6 +25,7 @@
#include "ClientProxyMembershipID.hpp"
#include "Connector.hpp"
#include "DistributedSystemImpl.hpp"
+#include "TcpConn.hpp"
#include "TcpSslConn.hpp"
#include "TcrConnectionManager.hpp"
#include "TcrEndpoint.hpp"
@@ -33,77 +34,115 @@
#include "Utils.hpp"
#include "Version.hpp"
-namespace apache {
-namespace geode {
-namespace client {
-
-const int HEADER_LENGTH = 17;
-const int CHUNK_HEADER_LENGTH = 5;
-const int8_t LAST_CHUNK_MASK = 0x1;
-const int64_t INITIAL_CONNECTION_ID = 26739;
-
#define throwException(ex) \
do { \
LOGFINEST(ex.getName() + ": " + ex.getMessage()); \
throw ex; \
} while (0)
+namespace {
+bool useReplyTimeout(const apache::geode::client::TcrMessage& request) {
+ switch (request.getMessageType()) {
+ case apache::geode::client::TcrMessage::QUERY:
+ case apache::geode::client::TcrMessage::QUERY_WITH_PARAMETERS:
+ case apache::geode::client::TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE:
+ case apache::geode::client::TcrMessage::GETDURABLECQS_MSG_TYPE:
+ case apache::geode::client::TcrMessage::EXECUTE_FUNCTION:
+ case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION:
+ case apache::geode::client::TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP:
+ return true;
+ default:
+ break;
+ }
+
+ return false;
+}
+
+int expiryTimeVariancePercentage() {
+ auto nowTimePoint = std::chrono::steady_clock::now().time_since_epoch();
+ auto now_ms =
+ std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint)
+ .count();
+ auto now_s =
+ std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count();
+
+ srand(static_cast<unsigned int>((now_s * 1000) + (now_ms / 1000)));
+
+ const int numbers = 21;
+ int random = rand() % numbers + 1;
+
+ if (random > 10) {
+ random = random - numbers;
+ }
+ return random;
+}
+
+const int HEADER_LENGTH = 17;
+const int CHUNK_HEADER_LENGTH = 5;
+const int8_t LAST_CHUNK_MASK = 0x1;
+const int64_t INITIAL_CONNECTION_ID = 26739;
+
struct FinalizeProcessChunk {
private:
- TcrMessage& m_reply;
+ apache::geode::client::TcrMessage& m_reply;
uint16_t m_endpointMemId;
public:
- FinalizeProcessChunk(TcrMessageReply& reply, uint16_t endpointMemId)
+ FinalizeProcessChunk(apache::geode::client::TcrMessageReply& reply,
+ uint16_t endpointMemId)
: m_reply(reply), m_endpointMemId(endpointMemId) {}
~FinalizeProcessChunk() noexcept(false) {
// Enqueue a nullptr chunk indicating a wait for processing to complete.
m_reply.processChunk(std::vector<uint8_t>(), 0, m_endpointMemId);
}
};
+} // namespace
+
+namespace apache {
+namespace geode {
+namespace client {
+
+TcrConnection::TcrConnection(const TcrConnectionManager& connectionManager)
+ : connectionId(0),
+ m_connectionManager(connectionManager),
+ m_expiryTimeVariancePercentage{expiryTimeVariancePercentage()},
+ m_hasServerQueue(NON_REDUNDANT_SERVER),
+ m_queueSize(0),
+ m_port(0),
+ m_chunksProcessSema(0),
+ m_isBeingUsed(false),
+ m_isUsed(0),
+ m_poolDM(nullptr) {}
bool TcrConnection::initTcrConnection(
- TcrEndpoint* endpointObj, const char* endpoint,
+ std::shared_ptr<TcrEndpoint> endpointObj,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification, bool isSecondary,
std::chrono::microseconds connectTimeout) {
- m_conn = nullptr;
m_endpointObj = endpointObj;
m_poolDM = dynamic_cast<ThinClientPoolDM*>(m_endpointObj->getPoolHADM());
- // add to the connection reference counter of the endpoint
- m_endpointObj->addConnRefCounter(1);
- // m_connected = isConnected;
m_hasServerQueue = NON_REDUNDANT_SERVER;
m_queueSize = 0;
- // m_chunksProcessSema = 0;
- m_creationTime = clock::now();
+ m_lastAccessed = m_creationTime = std::chrono::steady_clock::now();
connectionId = INITIAL_CONNECTION_ID;
- m_lastAccessed = clock::now();
auto cacheImpl = m_poolDM->getConnectionManager().getCacheImpl();
const auto& distributedSystem = cacheImpl->getDistributedSystem();
const auto& sysProp = distributedSystem.getSystemProperties();
- LOGDEBUG(
- "Tcrconnection const isSecondary = %d and isClientNotification = %d, "
- "this = %p, conn ref to endopint %d",
- isSecondary, isClientNotification, this,
- m_endpointObj->getConnRefCounter());
bool isPool = false;
m_isBeingUsed = false;
- m_endpoint = endpoint;
// Precondition:
// 1. isSecondary ==> isClientNotification
// Create TcpConn object which manages a socket connection with the endpoint.
if (endpointObj && endpointObj->getPoolHADM()) {
- m_conn = createConnection(
- m_endpoint, connectTimeout,
- static_cast<int32_t>(
- endpointObj->getPoolHADM()->getSocketBufferSize()));
+ createConnection(m_endpointObj->name().c_str(), connectTimeout,
+ static_cast<int32_t>(
+ endpointObj->getPoolHADM()->getSocketBufferSize()));
isPool = true;
} else {
- m_conn = createConnection(m_endpoint, connectTimeout,
- sysProp.maxSocketBufferSize());
+ createConnection(m_endpointObj->name().c_str(), connectTimeout,
+ sysProp.maxSocketBufferSize());
}
auto handShakeMsg = cacheImpl->createDataOutput();
@@ -225,7 +264,8 @@
"invoking getCredentials");
const auto& tmpAuthIniSecurityProperties =
- authInitialize->getCredentials(tmpSecurityProperties, m_endpoint);
+ authInitialize->getCredentials(tmpSecurityProperties,
+ m_endpointObj->name().c_str());
LOGFINER("TcrConnection: after getCredentials ");
credentials = tmpAuthIniSecurityProperties;
}
@@ -252,10 +292,13 @@
size_t msgLength;
auto data = reinterpret_cast<char*>(
const_cast<uint8_t*>(handShakeMsg.getBuffer(&msgLength)));
- LOGFINE("Attempting handshake with endpoint %s for %s%s connection", endpoint,
+ LOGFINE("Attempting handshake with endpoint %s for %s%s connection",
+ endpointObj->name().c_str(),
isClientNotification ? (isSecondary ? "secondary " : "primary ") : "",
isClientNotification ? "subscription" : "client");
- ConnErrType error = sendData(data, msgLength, connectTimeout, false);
+ LOGDEBUG(std::string("Handshake bytes: (") + std::to_string(msgLength) +
+ "): " + Utils::convertBytesToString(data, msgLength));
+ ConnErrType error = sendData(data, msgLength, connectTimeout);
if (error == CONN_NOERR) {
auto acceptanceCode = readHandshakeData(1, connectTimeout);
@@ -266,7 +309,7 @@
LOGERROR("SSL is enabled on server, enable SSL in client as well");
AuthenticationRequiredException ex(
"SSL is enabled on server, enable SSL in client as well");
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(ex);
}
@@ -299,12 +342,10 @@
///////////////////////////////////
////////////////////////// 3. Only when handshake is for subscription
///////////////////////////////////
- if (m_poolDM != nullptr) {
- if ((m_hasServerQueue == PRIMARY_SERVER ||
- m_hasServerQueue == NON_REDUNDANT_SERVER) &&
- isClientNotification) {
- m_poolDM->setPrimaryServerQueueSize(queueSize);
- }
+ if ((m_hasServerQueue == PRIMARY_SERVER ||
+ m_hasServerQueue == NON_REDUNDANT_SERVER) &&
+ isClientNotification) {
+ m_poolDM->setPrimaryServerQueueSize(queueSize);
}
if (!isClientNotification) {
@@ -326,7 +367,7 @@
}
auto recvMsgLenBytes = readHandshakeData(2, connectTimeout);
- auto dataInput3 = m_connectionManager->getCacheImpl()->createDataInput(
+ auto dataInput3 = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(recvMsgLenBytes.data()),
recvMsgLenBytes.size());
uint16_t recvMsgLen2 = dataInput3.readInt16();
@@ -334,7 +375,7 @@
if (!isClientNotification) {
auto deltaEnabledMsg = readHandshakeData(1, connectTimeout);
- auto di = m_connectionManager->getCacheImpl()->createDataInput(
+ auto di = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(deltaEnabledMsg.data()), 1);
ThinClientBaseDM::setDeltaEnabledOnServer(di.readBoolean());
}
@@ -349,19 +390,19 @@
case REPLY_AUTHENTICATION_FAILED: {
AuthenticationFailedException ex(
reinterpret_cast<char*>(recvMessage.data()));
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(ex);
}
case REPLY_AUTHENTICATION_REQUIRED: {
AuthenticationRequiredException ex(
reinterpret_cast<char*>(recvMessage.data()));
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(ex);
}
case REPLY_DUPLICATE_DURABLE_CLIENT: {
DuplicateDurableClientException ex(
reinterpret_cast<char*>(recvMessage.data()));
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(ex);
}
case REPLY_REFUSED:
@@ -374,7 +415,7 @@
"Handshake rejected by server: " +
reinterpret_cast<char*>(recvMessage.data());
CacheServerException ex(message);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throw ex;
}
default: {
@@ -388,16 +429,13 @@
" received from server in handshake: " +
reinterpret_cast<char*>(recvMessage.data());
MessageException ex(message);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throw ex;
}
}
} else {
- int32_t lastError = ACE_OS::last_error();
- LOGFINE("Handshake failed, errno: %d: %s", lastError,
- ACE_OS::strerror(lastError));
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
if (error & CONN_TIMEOUT) {
throw TimeoutException(
"TcrConnection::TcrConnection: "
@@ -424,134 +462,74 @@
return false;
}
-Connector* TcrConnection::createConnection(
- const std::string& address, std::chrono::microseconds connectTimeout,
- int32_t maxBuffSizePool) {
+void TcrConnection::createConnection(const std::string& address,
+ std::chrono::microseconds connectTimeout,
+ int32_t maxBuffSizePool) {
Connector* socket = nullptr;
- auto& systemProperties = m_connectionManager->getCacheImpl()
+ auto& systemProperties = m_connectionManager.getCacheImpl()
->getDistributedSystem()
.getSystemProperties();
+
if (systemProperties.sslEnabled()) {
auto sniHostname = m_poolDM->getSNIProxyHostname();
auto sniPort = m_poolDM->getSNIPort();
if (sniHostname.empty()) {
- socket = new TcpSslConn(address, connectTimeout, maxBuffSizePool,
- systemProperties.sslTrustStore(),
- systemProperties.sslKeyStore(),
- systemProperties.sslKeystorePassword());
+ m_conn.reset(new TcpSslConn(address, connectTimeout, maxBuffSizePool,
+ systemProperties.sslTrustStore(),
+ systemProperties.sslKeyStore(),
+ systemProperties.sslKeystorePassword()));
} else {
- socket = new TcpSslConn(
+ m_conn.reset(new TcpSslConn(
address, connectTimeout, maxBuffSizePool, sniHostname, sniPort,
systemProperties.sslTrustStore(), systemProperties.sslKeyStore(),
- systemProperties.sslKeystorePassword());
+ systemProperties.sslKeystorePassword()));
}
} else {
- socket = new TcpConn(address, connectTimeout, maxBuffSizePool);
+ m_conn.reset(new TcpConn(address, connectTimeout, maxBuffSizePool));
}
- // as socket.init() calls throws exception...
- m_conn = socket;
- socket->init();
- return socket;
}
-/* The timeout behaviour for different methods is as follows:
- * receive():
- * Header: default timeout
- * Body: default timeout
- * sendRequest()/sendRequestForChunkedResponse():
- * default timeout during send; for receive:
- * Header: default timeout * default timeout retries to handle large payload
- * if a timeout other than default timeout is specified then
- * that is used instead
- * Body: default timeout
- */
-inline ConnErrType TcrConnection::receiveData(
- char* buffer, size_t length, std::chrono::microseconds receiveTimeoutSec,
- bool checkConnected, bool isNotificationMessage) {
- std::chrono::microseconds defaultWaitSecs =
- isNotificationMessage ? std::chrono::seconds(1) : std::chrono::seconds(2);
- if (defaultWaitSecs > receiveTimeoutSec) defaultWaitSecs = receiveTimeoutSec;
+ConnErrType TcrConnection::receiveData(
+ char* buffer, const size_t length,
+ const std::chrono::microseconds timeout) {
+ try {
+ const auto readBytes = m_conn->receive(
+ buffer, length,
+ std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
- auto startLen = length;
-
- while (length > 0 && receiveTimeoutSec > std::chrono::microseconds::zero()) {
- if (checkConnected && !m_connected) {
- return CONN_IOERR;
+ m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
+ } catch (boost::system::system_error& ex) {
+ switch (ex.code().value()) {
+ case boost::asio::error::eof:
+ return CONN_NODATA;
+ case boost::asio::error::operation_aborted:
+ return CONN_TIMEOUT;
+ default:
+ break;
}
- if (receiveTimeoutSec < defaultWaitSecs) {
- defaultWaitSecs = receiveTimeoutSec;
- }
- auto readBytes = m_conn->receive(buffer, length, defaultWaitSecs);
- int32_t lastError = ACE_OS::last_error();
- length -= readBytes;
- if (length > 0 && lastError != ETIME && lastError != ETIMEDOUT) {
- return CONN_IOERR;
- }
- buffer += readBytes;
- /*
- Update pools byteRecieved stat here.
- readMessageChunked, readMessage, readHandshakeData,
- readHandshakeRawData, readHandShakeBytes, readHandShakeInt,
- readHandshakeString, all call TcrConnection::receiveData.
- */
- LOGDEBUG("TcrConnection::receiveData length = %zu defaultWaitSecs = %s",
- length, to_string(defaultWaitSecs).c_str());
- if (m_poolDM != nullptr) {
- LOGDEBUG("TcrConnection::receiveData readBytes = %zu", readBytes);
- m_poolDM->getStats().incReceivedBytes(static_cast<int64_t>(readBytes));
- }
- receiveTimeoutSec -= defaultWaitSecs;
- if ((length == startLen) && isNotificationMessage) { // no data read
- break;
- }
- }
- // Postconditions for checking bounds.
- return (length == 0 ? CONN_NOERR
- : (length == startLen ? CONN_NODATA : CONN_TIMEOUT));
-}
-
-inline ConnErrType TcrConnection::sendData(
- const char* buffer, size_t length, std::chrono::microseconds sendTimeout,
- bool checkConnected) {
- std::chrono::microseconds dummy{0};
- return sendData(dummy, buffer, length, sendTimeout, checkConnected);
-}
-
-inline ConnErrType TcrConnection::sendData(
- std::chrono::microseconds& timeSpent, const char* buffer, size_t length,
- std::chrono::microseconds sendTimeout, bool checkConnected) {
- std::chrono::microseconds defaultWaitSecs = std::chrono::seconds(2);
- if (defaultWaitSecs > sendTimeout) defaultWaitSecs = sendTimeout;
- LOGDEBUG(
- "before send len %zu sendTimeoutSec = %s checkConnected = %d "
- "m_connected "
- "%d",
- length, to_string(sendTimeout).c_str(), checkConnected, m_connected);
- while (length > 0 && sendTimeout > std::chrono::microseconds::zero()) {
- if (checkConnected && !m_connected) {
- return CONN_IOERR;
- }
- if (sendTimeout < defaultWaitSecs) {
- defaultWaitSecs = sendTimeout;
- }
- auto sentBytes = m_conn->send(buffer, length, defaultWaitSecs);
-
- length -= sentBytes;
- buffer += sentBytes;
- // we don't want to decrement the remaining time for the last iteration
- if (length == 0) {
- break;
- }
- int32_t lastError = ACE_OS::last_error();
- if (length > 0 && lastError != ETIME && lastError != ETIMEDOUT) {
- return CONN_IOERR;
- }
-
- timeSpent += defaultWaitSecs;
- sendTimeout -= defaultWaitSecs;
+ return CONN_IOERR;
}
- return (length == 0 ? CONN_NOERR : CONN_TIMEOUT);
+ return CONN_NOERR;
+}
+
+ConnErrType TcrConnection::sendData(const char* buffer, size_t length,
+ std::chrono::microseconds timeout) {
+ try {
+ m_conn->send(
+ buffer, length,
+ std::chrono::duration_cast<std::chrono::milliseconds>(timeout));
+ } catch (boost::system::system_error& ex) {
+ switch (ex.code().value()) {
+ case boost::asio::error::operation_aborted:
+ return CONN_TIMEOUT;
+ default:
+ break;
+ }
+ return CONN_IOERR;
+ }
+
+ return CONN_NOERR;
}
char* TcrConnection::sendRequest(const char* buffer, size_t len,
@@ -559,17 +537,17 @@
std::chrono::microseconds sendTimeoutSec,
std::chrono::microseconds receiveTimeoutSec,
int32_t request) {
- LOGDEBUG("TcrConnection::sendRequest");
- std::chrono::microseconds timeSpent{0};
-
- send(timeSpent, buffer, len, sendTimeoutSec);
+ const auto start = std::chrono::system_clock::now();
+ send(buffer, len, sendTimeoutSec);
+ const auto timeSpent = start - std::chrono::system_clock::now();
if (timeSpent >= receiveTimeoutSec) {
throwException(
TimeoutException("TcrConnection::send: connection timed out"));
}
- receiveTimeoutSec -= timeSpent;
+ receiveTimeoutSec -=
+ std::chrono::duration_cast<decltype(receiveTimeoutSec)>(timeSpent);
ConnErrType opErr = CONN_NOERR;
return readMessage(recvLen, receiveTimeoutSec, true, &opErr, false, request);
}
@@ -594,29 +572,19 @@
}
}
-bool TcrConnection::useReplyTimeout(const TcrMessage& request) const {
- auto messageType = request.getMessageType();
- return ((messageType == TcrMessage::QUERY) ||
- (messageType == TcrMessage::QUERY_WITH_PARAMETERS) ||
- (messageType == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) ||
- (messageType == TcrMessage::GETDURABLECQS_MSG_TYPE) ||
- (messageType == TcrMessage::EXECUTE_FUNCTION) ||
- (messageType == TcrMessage::EXECUTE_REGION_FUNCTION) ||
- (messageType == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP));
-}
-
std::chrono::microseconds TcrConnection::sendWithTimeouts(
const char* data, size_t len, std::chrono::microseconds sendTimeout,
std::chrono::microseconds receiveTimeout) {
- std::chrono::microseconds timeSpent{0};
- send(timeSpent, data, len, sendTimeout, true);
+ const auto start = std::chrono::system_clock::now();
+ send(data, len, sendTimeout, true);
+ const auto timeSpent = start - std::chrono::system_clock::now();
if (timeSpent >= receiveTimeout) {
throwException(
TimeoutException("TcrConnection::send: connection timed out"));
}
- return timeSpent;
+ return std::chrono::duration_cast<std::chrono::microseconds>(timeSpent);
}
bool TcrConnection::replyHasResult(const TcrMessage& request,
@@ -641,37 +609,23 @@
}
void TcrConnection::send(const char* buffer, size_t len,
- std::chrono::microseconds sendTimeoutSec,
- bool checkConnected) {
- std::chrono::microseconds dummy;
- send(dummy, buffer, len, sendTimeoutSec, checkConnected);
-}
-
-void TcrConnection::send(std::chrono::microseconds& timeSpent,
- const char* buffer, size_t len,
std::chrono::microseconds sendTimeoutSec, bool) {
- // LOGINFO("TcrConnection::send: [%p] sending request to endpoint %s;",
- //: this, m_endpoint);
-
LOGDEBUG(
"TcrConnection::send: [%p] sending request to endpoint %s; bytes: %s",
- this, m_endpoint, Utils::convertBytesToString(buffer, len).c_str());
+ this, m_endpointObj->name().c_str(),
+ Utils::convertBytesToString(buffer, len).c_str());
- ConnErrType error = sendData(timeSpent, buffer, len, sendTimeoutSec);
-
- LOGFINER(
- "TcrConnection::send: completed send request to endpoint %s "
- "with error: %d",
- m_endpoint, error);
-
- if (error != CONN_NOERR) {
- if (error == CONN_TIMEOUT) {
+ switch (sendData(buffer, len, sendTimeoutSec)) {
+ case CONN_NOERR:
+ break;
+ case CONN_TIMEOUT:
throwException(
TimeoutException("TcrConnection::send: connection timed out"));
- } else {
+ case CONN_NODATA:
+ case CONN_IOERR:
+ case CONN_OTHERERR:
throwException(
GeodeIOException("TcrConnection::send: connection failure"));
- }
}
}
@@ -694,12 +648,8 @@
headerTimeout = DEFAULT_READ_TIMEOUT * DEFAULT_TIMEOUT_RETRIES;
}
- LOGDEBUG("TcrConnection::readMessage: receiving reply from endpoint %s",
- m_endpoint);
+ error = receiveData(msg_header, HEADER_LENGTH, headerTimeout);
- error = receiveData(msg_header, HEADER_LENGTH, headerTimeout, true,
- isNotificationMessage);
- LOGDEBUG("TcrConnection::readMessage after recieve data");
if (error != CONN_NOERR) {
// the !isNotificationMessage ensures that notification channel
// gets the TimeoutException when no data was received and is ignored by
@@ -728,12 +678,12 @@
}
LOGDEBUG(
- "TcrConnection::readMessage: received header from endpoint %s; "
+ "TcrConnection::readMessage: [%p] received header from endpoint %s; "
"bytes: %s",
- m_endpoint,
+ this, m_endpointObj->name().c_str(),
Utils::convertBytesToString(msg_header, HEADER_LENGTH).c_str());
- auto input = m_connectionManager->getCacheImpl()->createDataInput(
+ auto input = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<uint8_t*>(msg_header), HEADER_LENGTH);
// ignore msgType
input.readInt32();
@@ -758,8 +708,7 @@
if (isNotificationMessage) {
mesgBodyTimeout = receiveTimeoutSec * DEFAULT_TIMEOUT_RETRIES;
}
- error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout,
- true, isNotificationMessage);
+ error = receiveData(fullMessage + HEADER_LENGTH, msgLen, mesgBodyTimeout);
if (error != CONN_NOERR) {
delete[] fullMessage;
// the !isNotificationMessage ensures that notification channel
@@ -785,7 +734,7 @@
LOGDEBUG(
"TcrConnection::readMessage: received message body from "
"endpoint %s; bytes: %s",
- m_endpoint,
+ m_endpointObj->name().c_str(),
Utils::convertBytesToString(fullMessage + HEADER_LENGTH, msgLen).c_str());
return fullMessage;
@@ -800,7 +749,7 @@
LOGFINER(
"TcrConnection::readMessageChunked: receiving reply from "
"endpoint %s",
- m_endpoint);
+ m_endpointObj->name().c_str());
auto responseHeader = readResponseHeader(headerTimeout);
@@ -834,7 +783,7 @@
LOGFINER(
"TcrConnection::readMessageChunked: read full reply "
"from endpoint %s",
- m_endpoint);
+ m_endpointObj->name().c_str());
}
std::chrono::microseconds TcrConnection::calculateHeaderTimeout(
@@ -852,7 +801,7 @@
chunkedResponseHeader header;
auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
- HEADER_LENGTH, timeout, true, false);
+ HEADER_LENGTH, timeout);
if (error != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
throwException(TimeoutException(
@@ -868,10 +817,10 @@
LOGDEBUG(
"TcrConnection::readResponseHeader: received header from "
"endpoint %s; bytes: %s",
- m_endpoint,
+ m_endpointObj->name().c_str(),
Utils::convertBytesToString(receiveBuffer, HEADER_LENGTH).c_str());
- auto input = m_connectionManager->getCacheImpl()->createDataInput(
+ auto input = m_connectionManager.getCacheImpl()->createDataInput(
receiveBuffer, HEADER_LENGTH);
header.messageType = input.readInt32();
header.numberOfParts = input.readInt32();
@@ -894,7 +843,7 @@
chunkHeader header;
auto error = receiveData(reinterpret_cast<char*>(receiveBuffer),
- CHUNK_HEADER_LENGTH, timeout, true, false);
+ CHUNK_HEADER_LENGTH, timeout);
if (error != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
throwException(TimeoutException(
@@ -910,10 +859,10 @@
LOGDEBUG(
"TcrConnection::readChunkHeader: received header from "
"endpoint %s; bytes: %s",
- m_endpoint,
+ m_endpointObj->name().c_str(),
Utils::convertBytesToString(receiveBuffer, CHUNK_HEADER_LENGTH).c_str());
- auto input = m_connectionManager->getCacheImpl()->createDataInput(
+ auto input = m_connectionManager.getCacheImpl()->createDataInput(
receiveBuffer, CHUNK_HEADER_LENGTH);
header.chunkLength = input.readInt32();
header.flags = input.read();
@@ -929,7 +878,7 @@
std::chrono::microseconds timeout, int32_t chunkLength) {
std::vector<uint8_t> chunkBody(chunkLength);
auto error = receiveData(reinterpret_cast<char*>(chunkBody.data()),
- chunkLength, timeout, true, false);
+ chunkLength, timeout);
if (error != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
throwException(
@@ -945,7 +894,7 @@
LOGDEBUG(
"TcrConnection::readChunkBody: received chunk body from endpoint "
"%s; bytes: %s",
- m_endpoint,
+ m_endpointObj->name().c_str(),
Utils::convertBytesToString(chunkBody.data(), chunkLength).c_str());
return chunkBody;
}
@@ -974,7 +923,7 @@
m_poolDM->getConnectionManager().getCacheImpl());
try {
if (!TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH &&
- !m_connectionManager->isNetDown()) {
+ !m_connectionManager.isNetDown()) {
send(closeMsg->getMsgData(), closeMsg->getMsgLength(),
std::chrono::seconds(2), false);
}
@@ -997,8 +946,8 @@
return message;
}
if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
- connectTimeout, false)) != CONN_NOERR) {
- GF_SAFE_DELETE_CON(m_conn);
+ connectTimeout)) != CONN_NOERR) {
+ m_conn.reset();
if (error & CONN_TIMEOUT) {
throwException(
TimeoutException("TcrConnection::TcrConnection: "
@@ -1024,8 +973,8 @@
}
std::vector<int8_t> message(msgLength);
if ((error = receiveData(reinterpret_cast<char*>(message.data()), msgLength,
- connectTimeout, false)) != CONN_NOERR) {
- GF_SAFE_DELETE_CON(m_conn);
+ connectTimeout)) != CONN_NOERR) {
+ m_conn.reset();
if (error & CONN_TIMEOUT) {
throwException(
TimeoutException("TcrConnection::TcrConnection: "
@@ -1054,13 +1003,13 @@
int32_t arrayLength = static_cast<uint8_t>(arrayLenHeader[0]);
if (static_cast<int8_t>(arrayLenHeader[0]) == -2) {
auto arrayLengthBytes = readHandshakeData(2, connectTimeout);
- auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput(
+ auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
arrayLengthBytes.size());
arrayLength = dataInput2.readInt16();
} else if (static_cast<int8_t>(arrayLenHeader[0]) == -3) {
auto arrayLengthBytes = readHandshakeData(4, connectTimeout);
- auto dataInput2 = m_connectionManager->getCacheImpl()->createDataInput(
+ auto dataInput2 = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(arrayLengthBytes.data()),
arrayLengthBytes.size());
arrayLength = dataInput2.readInt32();
@@ -1105,16 +1054,16 @@
_GEODE_NEW(recvMessage, uint8_t[numberOfBytes]);
if ((error = receiveData(reinterpret_cast<char*>(recvMessage), numberOfBytes,
- connectTimeout, false)) != CONN_NOERR) {
+ connectTimeout)) != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(
TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
@@ -1131,24 +1080,23 @@
_GEODE_NEW(recvMessage, uint8_t[4]);
if ((error = receiveData(reinterpret_cast<char*>(recvMessage), 4,
- connectTimeout, false)) != CONN_NOERR) {
+ connectTimeout)) != CONN_NOERR) {
if (error & CONN_TIMEOUT) {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(
TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake"));
} else {
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
"Handshake failure"));
}
}
- auto di =
- m_connectionManager->getCacheImpl()->createDataInput(recvMessage, 4);
+ auto di = m_connectionManager.getCacheImpl()->createDataInput(recvMessage, 4);
int32_t val = di.readInt32();
_GEODE_SAFE_DELETE_ARRAY(recvMessage);
@@ -1161,8 +1109,8 @@
ConnErrType error = CONN_NOERR;
char cstypeid;
- if (receiveData(&cstypeid, 1, connectTimeout, false) != CONN_NOERR) {
- GF_SAFE_DELETE_CON(m_conn);
+ if (receiveData(&cstypeid, 1, connectTimeout) != CONN_NOERR) {
+ m_conn.reset();
if (error & CONN_TIMEOUT) {
LOGFINE("Timeout receiving string typeid");
throwException(
@@ -1185,7 +1133,7 @@
}
case DSCode::CacheableASCIIString: {
auto lenBytes = readHandshakeData(2, connectTimeout);
- auto lenDI = m_connectionManager->getCacheImpl()->createDataInput(
+ auto lenDI = m_connectionManager.getCacheImpl()->createDataInput(
reinterpret_cast<const uint8_t*>(lenBytes.data()), lenBytes.size());
length = lenDI.readInt16();
@@ -1241,7 +1189,7 @@
case DSCode::CacheableUserData:
case DSCode::CacheableUserData4:
case DSCode::PDX: {
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
"Handshake failure: Unexpected string type ID"));
@@ -1257,16 +1205,16 @@
std::vector<char> recvMessage(length + 1);
recvMessage[length] = '\0';
- if ((error = receiveData(recvMessage.data(), length, connectTimeout,
- false)) != CONN_NOERR) {
+ if ((error = receiveData(recvMessage.data(), length, connectTimeout)) !=
+ CONN_NOERR) {
if (error & CONN_TIMEOUT) {
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
LOGFINE("Timeout receiving string data");
throwException(
TimeoutException("TcrConnection::TcrConnection: "
"Timeout in handshake reading string bytes"));
} else {
- GF_SAFE_DELETE_CON(m_conn);
+ m_conn.reset();
LOGFINE("IO error receiving string data");
throwException(
GeodeIOException("TcrConnection::TcrConnection: "
@@ -1279,13 +1227,15 @@
return retval;
}
}
+
bool TcrConnection::hasExpired(const std::chrono::milliseconds& expiryTime) {
if (expiryTime <= std::chrono::milliseconds::zero()) {
return false;
}
auto variadicExpiryTime =
expiryTime + (expiryTime * m_expiryTimeVariancePercentage) / 100;
- return (clock::now() - m_creationTime) > variadicExpiryTime;
+ return (std::chrono::steady_clock::now() - m_creationTime) >
+ variadicExpiryTime;
}
bool TcrConnection::isIdle(const std::chrono::milliseconds& idleTime) {
@@ -1293,12 +1243,14 @@
return false;
}
- return (clock::now() - m_lastAccessed) > idleTime;
+ return (std::chrono::steady_clock::now() - m_lastAccessed) > idleTime;
}
-void TcrConnection::touch() { m_lastAccessed = clock::now(); }
+void TcrConnection::touch() {
+ m_lastAccessed = std::chrono::steady_clock::now();
+}
-TcrConnection::time_point TcrConnection::getLastAccessed() {
+std::chrono::steady_clock::time_point TcrConnection::getLastAccessed() {
return m_lastAccessed;
}
@@ -1316,20 +1268,11 @@
}
void TcrConnection::updateCreationTime() {
- m_creationTime = clock::now();
+ m_creationTime = std::chrono::steady_clock::now();
touch();
}
-TcrConnection::~TcrConnection() {
- LOGDEBUG("Tcrconnection destructor %p . conn ref to endopint %d", this,
- m_endpointObj->getConnRefCounter());
- m_endpointObj->addConnRefCounter(-1);
- if (m_conn != nullptr) {
- LOGDEBUG("closing the connection");
- m_conn->close();
- GF_SAFE_DELETE_CON(m_conn);
- }
-}
+TcrConnection::~TcrConnection() {}
bool TcrConnection::setAndGetBeingUsed(volatile bool isBeingUsed,
bool forTransaction) {
diff --git a/cppcache/src/TcrConnection.hpp b/cppcache/src/TcrConnection.hpp
index c0bbaf0..857ff0d 100644
--- a/cppcache/src/TcrConnection.hpp
+++ b/cppcache/src/TcrConnection.hpp
@@ -49,14 +49,6 @@
#define SECURITY_CREDENTIALS_NORMAL 1
#define SECURITY_MULTIUSER_NOTIFICATIONCHANNEL 3
-/** Closes and Deletes connection only if it exists */
-#define GF_SAFE_DELETE_CON(x) \
- do { \
- x->close(); \
- delete x; \
- x = nullptr; \
- } while (0)
-
namespace apache {
namespace geode {
namespace client {
@@ -93,9 +85,6 @@
class TcrConnectionManager;
class TcrConnection {
public:
- using clock = std::chrono::steady_clock;
- using time_point = clock::time_point;
-
/** Create one connection, endpoint is in format of hostname:portno
* It will do handshake with j-server. There're 2 types of handshakes:
* 1) handshake for request
@@ -120,43 +109,12 @@
* @param ports List of local ports for connections to endpoint
*/
bool initTcrConnection(
- TcrEndpoint* endpointObj, const char* endpoint,
+ std::shared_ptr<TcrEndpoint> endpointObj,
synchronized_set<std::unordered_set<uint16_t>>& ports,
bool isClientNotification = false, bool isSecondary = false,
std::chrono::microseconds connectTimeout = DEFAULT_CONNECT_TIMEOUT);
- TcrConnection(const TcrConnectionManager& connectionManager,
- volatile const bool& isConnected)
- : connectionId(0),
- m_connectionManager(&connectionManager),
- m_endpoint(nullptr),
- m_endpointObj(nullptr),
- m_connected(isConnected),
- m_conn(nullptr),
- m_hasServerQueue(NON_REDUNDANT_SERVER),
- m_queueSize(0),
- m_port(0),
- m_chunksProcessSema(0),
- m_isBeingUsed(false),
- m_isUsed(0),
- m_poolDM(nullptr) {
- auto nowTimePoint = clock::now().time_since_epoch();
- auto now_ms =
- std::chrono::duration_cast<std::chrono::milliseconds>(nowTimePoint)
- .count();
- auto now_s =
- std::chrono::duration_cast<std::chrono::seconds>(nowTimePoint).count();
- auto seed = (now_s * 1000) + (now_ms / 1000);
- srand(static_cast<unsigned int>(seed));
- int numbers = 21;
- int random = rand() % numbers + 1;
- if (random > 10) {
- random = random - numbers;
- }
- m_expiryTimeVariancePercentage = random;
- LOGDEBUG("m_expiryTimeVariancePercentage set to: %d",
- m_expiryTimeVariancePercentage);
- }
+ explicit TcrConnection(const TcrConnectionManager& connectionManager);
/* destroy the connection */
~TcrConnection();
@@ -227,11 +185,6 @@
std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
bool checkConnected = true);
- void send(std::chrono::microseconds& timeSpent, const char* buffer,
- size_t len,
- std::chrono::microseconds sendTimeoutSec = DEFAULT_WRITE_TIMEOUT,
- bool checkConnected = true);
-
/**
* This method is for receiving client notification. It will read 2 times as
* reading reply in sendRequest()
@@ -290,7 +243,7 @@
uint16_t inline getPort() { return m_port; }
- TcrEndpoint* getEndpointObject() const { return m_endpointObj; }
+ TcrEndpoint* getEndpointObject() const { return m_endpointObj.get(); }
bool isBeingUsed() { return m_isBeingUsed; }
bool setAndGetBeingUsed(
volatile bool isBeingUsed,
@@ -300,7 +253,7 @@
void touch();
bool hasExpired(const std::chrono::milliseconds& expiryTime);
bool isIdle(const std::chrono::milliseconds& idleTime);
- time_point getLastAccessed();
+ std::chrono::steady_clock::time_point getLastAccessed();
void updateCreationTime();
int64_t getConnectionId() {
@@ -314,12 +267,12 @@
}
const TcrConnectionManager& getConnectionManager() {
- return *m_connectionManager;
+ return m_connectionManager;
}
private:
int64_t connectionId;
- const TcrConnectionManager* m_connectionManager;
+ const TcrConnectionManager& m_connectionManager;
int m_expiryTimeVariancePercentage = 0;
std::chrono::microseconds calculateHeaderTimeout(
@@ -364,9 +317,9 @@
std::chrono::microseconds connectTimeout);
/** Create a normal or SSL connection */
- Connector* createConnection(
+ void createConnection(
const std::string& address,
- std::chrono::microseconds waitSeconds = DEFAULT_CONNECT_TIMEOUT,
+ std::chrono::microseconds wait = DEFAULT_CONNECT_TIMEOUT,
int32_t maxBuffSizePool = 0);
/**
@@ -399,25 +352,16 @@
* Send data to the connection till sendTimeout
*/
ConnErrType sendData(const char* buffer, size_t length,
- std::chrono::microseconds sendTimeout,
- bool checkConnected = true);
-
- ConnErrType sendData(std::chrono::microseconds& timeSpent, const char* buffer,
- size_t length, std::chrono::microseconds sendTimeout,
- bool checkConnected = true);
+ std::chrono::microseconds sendTimeout);
/**
* Read data from the connection till receiveTimeoutSec
*/
ConnErrType receiveData(char* buffer, size_t length,
- std::chrono::microseconds receiveTimeoutSec,
- bool checkConnected = true,
- bool isNotificationMessage = false);
+ std::chrono::microseconds receiveTimeoutSec);
- const char* m_endpoint;
- TcrEndpoint* m_endpointObj;
- volatile const bool& m_connected;
- Connector* m_conn;
+ std::shared_ptr<TcrEndpoint> m_endpointObj;
+ std::unique_ptr<Connector> m_conn;
ServerQueueStatus m_hasServerQueue;
int32_t m_queueSize;
uint16_t m_port;
@@ -425,8 +369,8 @@
// semaphore to synchronize with the chunked response processing thread
ACE_Semaphore m_chunksProcessSema;
- time_point m_creationTime;
- time_point m_lastAccessed;
+ std::chrono::steady_clock::time_point m_creationTime;
+ std::chrono::steady_clock::time_point m_lastAccessed;
// Disallow copy constructor and assignment operator.
TcrConnection(const TcrConnection&);
@@ -434,7 +378,6 @@
volatile bool m_isBeingUsed;
std::atomic<uint32_t> m_isUsed;
ThinClientPoolDM* m_poolDM;
- bool useReplyTimeout(const TcrMessage& request) const;
std::chrono::microseconds sendWithTimeouts(
const char* data, size_t len, std::chrono::microseconds sendTimeout,
std::chrono::microseconds receiveTimeout);
diff --git a/cppcache/src/TcrConnectionManager.cpp b/cppcache/src/TcrConnectionManager.cpp
index c605f35..7159a30 100644
--- a/cppcache/src/TcrConnectionManager.cpp
+++ b/cppcache/src/TcrConnectionManager.cpp
@@ -153,16 +153,9 @@
// cleanup of endpoints, when regions are destroyed via notification
{
auto &&guard = m_endpoints.make_lock();
- auto numEndPoints = m_endpoints.size();
- if (numEndPoints > 0) {
+ if (m_endpoints.size() > 0) {
LOGFINE("TCCM: endpoints remain in destructor");
}
- for (const auto &iter : m_endpoints) {
- auto ep = iter.second;
- LOGFINE("TCCM: forcing endpoint delete for %s in destructor",
- ep->name().c_str());
- _GEODE_SAFE_DELETE(ep);
- }
}
}
TcrConnectionManager::TEST_DURABLE_CLIENT_CRASH = false;
@@ -189,7 +182,7 @@
"TCCM 2: incremented region reference count for endpoint %s "
"to %d",
ep->name().c_str(), ep->numRegions());
- endpoints.push_back(ep);
+ endpoints.push_back(ep.get());
}
} else {
for (const auto &iter : endpointStrs) {
@@ -216,14 +209,15 @@
TcrEndpoint *TcrConnectionManager::addRefToTcrEndpoint(std::string endpointName,
ThinClientBaseDM *dm) {
- TcrEndpoint *ep = nullptr;
+ std::shared_ptr<TcrEndpoint> ep;
auto &&guard = m_endpoints.make_lock();
const auto &find = m_endpoints.find(endpointName);
if (find == m_endpoints.end()) {
// this endpoint does not exist
- ep = new TcrEndpoint(endpointName, m_cache, m_failoverSema, m_cleanupSema,
- m_redundancySema, dm, false);
+ ep = std::make_shared<TcrEndpoint>(endpointName, m_cache, m_failoverSema,
+ m_cleanupSema, m_redundancySema, dm,
+ false);
m_endpoints.emplace(endpointName, ep);
} else {
ep = find->second;
@@ -233,7 +227,7 @@
LOGFINER("TCCM: incremented region reference count for endpoint %s to %d",
ep->name().c_str(), ep->numRegions());
- return ep;
+ return ep.get();
}
void TcrConnectionManager::disconnect(ThinClientBaseDM *distMng,
@@ -325,7 +319,7 @@
std::vector<TcrEndpoint *> &endpoints) {
auto &&guard = m_endpoints.make_lock();
for (const auto &currItr : m_endpoints) {
- endpoints.push_back(currItr.second);
+ endpoints.push_back(currItr.second.get());
}
}
@@ -394,7 +388,7 @@
auto &&guard = m_endpoints.make_lock();
auto currItr = m_endpoints.begin();
while (currItr != m_endpoints.end()) {
- if (removeRefToEndpoint(currItr->second)) {
+ if (removeRefToEndpoint(currItr->second.get())) {
currItr = m_endpoints.begin();
} else {
currItr++;
diff --git a/cppcache/src/TcrConnectionManager.hpp b/cppcache/src/TcrConnectionManager.hpp
index ed8763b..682e090 100644
--- a/cppcache/src/TcrConnectionManager.hpp
+++ b/cppcache/src/TcrConnectionManager.hpp
@@ -50,6 +50,10 @@
*/
class TcrConnectionManager {
public:
+ using endpoint_map_type = synchronized_map<
+ std::unordered_map<std::string, std::shared_ptr<TcrEndpoint>>,
+ std::recursive_mutex>;
+
explicit TcrConnectionManager(CacheImpl* cache);
~TcrConnectionManager();
void init(bool isPool = false);
@@ -74,11 +78,7 @@
void setClientCrashTEST() { TEST_DURABLE_CLIENT_CRASH = true; }
volatile static bool TEST_DURABLE_CLIENT_CRASH;
- inline synchronized_map<std::unordered_map<std::string, TcrEndpoint*>,
- std::recursive_mutex>&
- getGlobalEndpoints() {
- return m_endpoints;
- }
+ inline endpoint_map_type& getGlobalEndpoints() { return m_endpoints; }
void getAllEndpoints(std::vector<TcrEndpoint*>& endpoints);
int getNumEndPoints();
@@ -96,10 +96,6 @@
bool getEndpointStatus(const std::string& endpoint);
- void addPoolEndpoints(TcrEndpoint* endpoint) {
- m_poolEndpointList.push_back(endpoint);
- }
-
bool isDurable() { return m_isDurable; }
bool haEnabled() { return m_redundancyManager->m_HAenabled; }
CacheImpl* getCacheImpl() const { return m_cache; }
@@ -141,10 +137,7 @@
private:
CacheImpl* m_cache;
volatile bool m_initGuard;
- synchronized_map<std::unordered_map<std::string, TcrEndpoint*>,
- std::recursive_mutex>
- m_endpoints;
- std::list<TcrEndpoint*> m_poolEndpointList;
+ endpoint_map_type m_endpoints;
// key is hostname:port
std::list<ThinClientBaseDM*> m_distMngrs;
diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
index 4d2a093..0ef91e2 100644
--- a/cppcache/src/TcrEndpoint.cpp
+++ b/cppcache/src/TcrEndpoint.cpp
@@ -72,9 +72,9 @@
m_isActiveEndpoint(false),
m_serverQueueStatus(NON_REDUNDANT_SERVER),
m_queueSize(0),
- m_noOfConnRefs(0),
m_distributedMemId(0),
- m_isServerQueueStatusSet(false) {
+ m_isServerQueueStatusSet(false),
+ m_connCreatedWhenMaxConnsIsZero(false) {
/*
m_name = Utils::convertHostToCanonicalForm(m_name.c_str() );
*/
@@ -144,9 +144,8 @@
if (locked) {
try {
LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
- newConn =
- new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected);
- newConn->initTcrConnection(this, m_name.c_str(), m_ports,
+ newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager());
+ newConn->initTcrConnection(shared_from_this(), m_ports,
isClientNotification, isSecondary,
connectTimeout);
@@ -193,10 +192,9 @@
try {
if (newConn == nullptr) {
if (!needtoTakeConnectLock() || !appThreadRequest) {
- newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
- m_connected);
+ newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager());
bool authenticate = newConn->initTcrConnection(
- this, m_name.c_str(), m_ports, isClientNotification, isSecondary,
+ shared_from_this(), m_ports, isClientNotification, isSecondary,
connectTimeout);
if (authenticate) {
authenticateEndpoint(newConn);
@@ -499,7 +497,7 @@
void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str());
- if (!m_connected || m_noOfConnRefs == 0) {
+ if (!m_connected) {
LOGFINER("Skipping ping task for disconnected endpoint %s", m_name.c_str());
return;
}
@@ -722,7 +720,7 @@
inline bool TcrEndpoint::handleIOException(const std::string& message,
TcrConnection*& conn, bool) {
int32_t lastError = ACE_OS::last_error();
- if (lastError == ECONNRESET || lastError == EPIPE) {
+ if (lastError == ECONNRESET || lastError == EPIPE || lastError == ENOTCONN) {
_GEODE_SAFE_DELETE(conn);
} else {
closeConnection(conn);
@@ -806,7 +804,6 @@
}
}
size_t dataLen;
- LOGDEBUG("sendRequestConn: calling sendRequest");
auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(),
&dataLen, request.getTimeout(),
reply.getTimeout(), request.getMessageType());
@@ -887,9 +884,9 @@
auto timeout = requestedTimeout;
epFailure = false;
if (useEPPool) {
- if (m_maxConnections == 0) {
+ if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
std::lock_guard<decltype(m_connectionLock)> guard(m_connectionLock);
- if (m_maxConnections == 0) {
+ if (m_maxConnections == 0 && !m_connCreatedWhenMaxConnsIsZero) {
LOGFINE(
"Creating a new connection when connection-pool-size system "
"property set to 0");
@@ -901,7 +898,7 @@
epFailure = true;
continue;
}
- m_maxConnections = 1;
+ m_connCreatedWhenMaxConnsIsZero = true;
}
}
}
@@ -1172,15 +1169,16 @@
void TcrEndpoint::closeConnection(TcrConnection*& conn) {
conn->close();
m_ports.erase(conn->getPort());
- _GEODE_SAFE_DELETE(conn);
+ try {
+ _GEODE_SAFE_DELETE(conn);
+ } catch (...) {
+ }
}
void TcrEndpoint::closeConnections() {
m_opConnections.close();
m_ports.clear();
- m_maxConnections = m_cacheImpl->getDistributedSystem()
- .getSystemProperties()
- .connectionPoolSize();
+ m_connCreatedWhenMaxConnsIsZero = false;
}
/*
diff --git a/cppcache/src/TcrEndpoint.hpp b/cppcache/src/TcrEndpoint.hpp
index cfbd63a..43b062a 100644
--- a/cppcache/src/TcrEndpoint.hpp
+++ b/cppcache/src/TcrEndpoint.hpp
@@ -48,7 +48,7 @@
class ThinClientPoolDM;
class QueryService;
-class TcrEndpoint {
+class TcrEndpoint : public std::enable_shared_from_this<TcrEndpoint> {
public:
TcrEndpoint(
const std::string& name, CacheImpl* cacheImpl,
@@ -58,6 +58,9 @@
virtual ~TcrEndpoint();
+ TcrEndpoint(const TcrEndpoint&) = delete;
+ TcrEndpoint& operator=(const TcrEndpoint&) = delete;
+
virtual GfErrType registerDM(bool clientNotification,
bool isSecondary = false,
bool isActiveEndpoint = false,
@@ -166,9 +169,6 @@
int32_t numberOfTimesFailed() { return m_numberOfTimesFailed; }
- void addConnRefCounter(int count) { m_noOfConnRefs += count; }
-
- int getConnRefCounter() { return m_noOfConnRefs; }
virtual uint16_t getDistributedMemberID() { return m_distributedMemId; }
virtual void setDistributedMemberID(uint16_t memId) {
m_distributedMemId = memId;
@@ -229,19 +229,14 @@
bool m_isActiveEndpoint;
ServerQueueStatus m_serverQueueStatus;
int32_t m_queueSize;
- std::atomic<int32_t> m_noOfConnRefs;
uint16_t m_distributedMemId;
bool m_isServerQueueStatusSet;
+ volatile bool m_connCreatedWhenMaxConnsIsZero;
bool compareTransactionIds(int32_t reqTransId, int32_t replyTransId,
std::string& failReason, TcrConnection* conn);
void closeConnections();
void setRetry(const TcrMessage& request, int& maxSendRetries);
- // number of connections to this endpoint
-
- // Disallow copy constructor and assignment operator.
- TcrEndpoint(const TcrEndpoint&);
- TcrEndpoint& operator=(const TcrEndpoint&);
};
} // namespace client
} // namespace geode
diff --git a/cppcache/src/ThinClientBaseDM.hpp b/cppcache/src/ThinClientBaseDM.hpp
index 2920876..61c3fe3 100644
--- a/cppcache/src/ThinClientBaseDM.hpp
+++ b/cppcache/src/ThinClientBaseDM.hpp
@@ -37,6 +37,7 @@
class TcrMessage;
class ThinClientRegion;
class TcrEndpoint;
+class TcrConnection;
class TcrConnectionManager;
class TcrMessageReply;
class TcrChunkedContext;
diff --git a/cppcache/src/ThinClientLocatorHelper.cpp b/cppcache/src/ThinClientLocatorHelper.cpp
index 89cc97d..357681c 100644
--- a/cppcache/src/ThinClientLocatorHelper.cpp
+++ b/cppcache/src/ThinClientLocatorHelper.cpp
@@ -33,6 +33,7 @@
#include "LocatorListResponse.hpp"
#include "QueueConnectionRequest.hpp"
#include "QueueConnectionResponse.hpp"
+#include "TcpConn.hpp"
#include "TcpSslConn.hpp"
#include "TcrConnectionManager.hpp"
#include "ThinClientPoolDM.hpp"
@@ -44,17 +45,12 @@
const size_t BUFF_SIZE = 3000;
const size_t DEFAULT_CONNECTION_RETRIES = 3;
-ThinClientLocatorHelper::ConnectionWrapper::~ConnectionWrapper() {
- if (conn_ != nullptr) {
- LOGDEBUG("Closing the locator connection");
- conn_->close();
- delete conn_;
- }
-}
-
ThinClientLocatorHelper::ThinClientLocatorHelper(
const std::vector<std::string>& locators, const ThinClientPoolDM* poolDM)
- : locators_(locators.begin(), locators.end()), m_poolDM(poolDM) {}
+ : locators_(locators.begin(), locators.end()),
+ m_poolDM(poolDM),
+ m_sniProxyHost(""),
+ m_sniProxyPort(0) {}
ThinClientLocatorHelper::ThinClientLocatorHelper(
const std::vector<std::string>& locators, const std::string& sniProxyHost,
@@ -85,15 +81,13 @@
return locators;
}
-ThinClientLocatorHelper::ConnectionWrapper
-ThinClientLocatorHelper::createConnection(
+std::unique_ptr<Connector> ThinClientLocatorHelper::createConnection(
const ServerLocation& location) const {
auto& sys_prop = m_poolDM->getConnectionManager()
.getCacheImpl()
->getDistributedSystem()
.getSystemProperties();
- Connector* conn;
const auto port = location.getPort();
auto timeout = sys_prop.connectTimeout();
const auto& hostname = location.getServerName();
@@ -101,23 +95,20 @@
if (sys_prop.sslEnabled()) {
if (m_sniProxyHost.empty()) {
- conn = new TcpSslConn(hostname, static_cast<uint16_t>(port), timeout,
- buffer_size, sys_prop.sslTrustStore(),
- sys_prop.sslKeyStore(),
- sys_prop.sslKeystorePassword());
+ return std::unique_ptr<Connector>(new TcpSslConn(
+ hostname, static_cast<uint16_t>(port), timeout, buffer_size,
+ sys_prop.sslTrustStore(), sys_prop.sslKeyStore(),
+ sys_prop.sslKeystorePassword()));
} else {
- conn = new TcpSslConn(hostname, timeout, buffer_size, m_sniProxyHost,
- m_sniProxyPort, sys_prop.sslTrustStore(),
- sys_prop.sslKeyStore(),
- sys_prop.sslKeystorePassword());
+ return std::unique_ptr<Connector>(new TcpSslConn(
+ hostname, static_cast<uint16_t>(port), m_sniProxyHost, m_sniProxyPort,
+ timeout, buffer_size, sys_prop.sslTrustStore(),
+ sys_prop.sslKeyStore(), sys_prop.sslKeystorePassword()));
}
} else {
- conn = new TcpConn(hostname, port, timeout, buffer_size);
+ return std::unique_ptr<Connector>(new TcpConn(
+ hostname, static_cast<uint16_t>(port), timeout, buffer_size));
}
-
- ConnectionWrapper cw{conn};
- cw->init();
- return cw;
}
std::shared_ptr<Serializable> ThinClientLocatorHelper::sendRequest(
@@ -141,9 +132,8 @@
return nullptr;
}
char buff[BUFF_SIZE];
- auto receivedLength =
- conn->receive(buff, BUFF_SIZE, m_poolDM->getReadTimeout());
- if (receivedLength <= 0) {
+ const auto receivedLength = conn->receive(buff, m_poolDM->getReadTimeout());
+ if (!receivedLength) {
return nullptr;
}
@@ -163,6 +153,8 @@
} catch (const Exception& excp) {
LOGFINE("Exception while querying locator: %s: %s", excp.getName().c_str(),
excp.what());
+ } catch (...) {
+ LOGFINE("Exception while querying locator");
}
return nullptr;
diff --git a/cppcache/src/ThinClientLocatorHelper.hpp b/cppcache/src/ThinClientLocatorHelper.hpp
index d59fab9..71ba7d1 100644
--- a/cppcache/src/ThinClientLocatorHelper.hpp
+++ b/cppcache/src/ThinClientLocatorHelper.hpp
@@ -40,6 +40,7 @@
class ThinClientPoolDM;
class Connector;
+class TcrConnection;
class ThinClientLocatorHelper {
public:
@@ -69,28 +70,6 @@
private:
/**
- * Auxiliary types
- */
-
- class ConnectionWrapper {
- private:
- Connector* conn_;
-
- public:
- ConnectionWrapper(const ConnectionWrapper&) = delete;
- ConnectionWrapper& operator=(const ConnectionWrapper&) = delete;
-
- explicit ConnectionWrapper(Connector* conn) : conn_(conn) {}
- ConnectionWrapper(ConnectionWrapper&& other) : conn_(other.conn_) {
- other.conn_ = nullptr;
- }
-
- ~ConnectionWrapper();
-
- Connector* operator->() { return conn_; }
- };
-
- /**
* Returns the number of connections retries per request
* @return Number of connection retries towards locators
*/
@@ -110,9 +89,10 @@
/**
* Creates a connection to the given locator
* @param location Locator ServerLocation
- * @return A connection wrapper for the locator
+ * @return A connection for the locator
*/
- ConnectionWrapper createConnection(const ServerLocation& location) const;
+ std::unique_ptr<Connector> createConnection(
+ const ServerLocation& location) const;
/**
* Sends a request to the given locator
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index d8b23b4..52eca1e 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -40,6 +40,14 @@
#include "statistics/PoolStatsSampler.hpp"
#include "util/exception.hpp"
+/** Closes and Deletes connection only if it exists */
+#define GF_SAFE_DELETE_CON(x) \
+ do { \
+ x->close(); \
+ delete x; \
+ x = nullptr; \
+ } while (0)
+
namespace apache {
namespace geode {
namespace client {
@@ -379,10 +387,26 @@
while (isRunning) {
m_connSema.acquire();
if (isRunning) {
- manageConnectionsInternal(isRunning);
- m_connSema.acquire();
+ try {
+ LOGFINE(
+ "ThinClientPoolDM::manageConnections: checking connections in "
+ "pool");
+
+ manageConnectionsInternal(isRunning);
+ } catch (const Exception& e) {
+ LOGERROR("ThinClientPoolDM::manageConnections: Geode Exception: \"%s\"",
+ e.what());
+ LOGERROR(e.getStackTrace());
+ } catch (const std::exception& e) {
+ LOGERROR(
+ "ThinClientPoolDM::manageConnections: Standard exception: \"%s\"",
+ e.what());
+ } catch (...) {
+ LOGERROR("ThinClientPoolDM::manageConnections: Unexpected exception");
+ }
}
}
+
LOGFINE("ThinClientPoolDM: ending manageConnections thread");
}
@@ -411,8 +435,9 @@
removelist.push_back(conn);
} else if (conn) {
auto nextIdle =
- _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
- TcrConnection::clock::now() - conn->getLastAccessed());
+ _idle -
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - conn->getLastAccessed());
if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
_nextIdle = nextIdle;
}
@@ -431,7 +456,10 @@
iter != removelist.end(); ++iter) {
TcrConnection* conn = *iter;
if (replaceCount <= 0) {
- GF_SAFE_DELETE_CON(conn);
+ try {
+ GF_SAFE_DELETE_CON(conn);
+ } catch (...) {
+ }
removeEPConnections(1, false);
getStats().incLoadCondDisconnects();
LOGDEBUG("Removed a connection");
@@ -442,21 +470,28 @@
/*hasExpired(conn) ? nullptr :*/ conn);
if (newConn) {
auto nextIdle =
- _idle - std::chrono::duration_cast<std::chrono::milliseconds>(
- TcrConnection::clock::now() - conn->getLastAccessed());
+ _idle -
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::steady_clock::now() - conn->getLastAccessed());
if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
_nextIdle = nextIdle;
}
put(newConn, false);
if (newConn != conn) {
- GF_SAFE_DELETE_CON(conn);
+ try {
+ GF_SAFE_DELETE_CON(conn);
+ } catch (...) {
+ }
removeEPConnections(1, false);
getStats().incLoadCondDisconnects();
LOGDEBUG("Removed a connection");
}
} else {
if (hasExpired(conn)) {
- GF_SAFE_DELETE_CON(conn);
+ try {
+ GF_SAFE_DELETE_CON(conn);
+ } catch (...) {
+ }
removeEPConnections(1, false);
getStats().incLoadCondDisconnects();
LOGDEBUG("Removed a connection");
@@ -465,7 +500,7 @@
auto nextIdle =
_idle -
std::chrono::duration_cast<std::chrono::milliseconds>(
- TcrConnection::clock::now() - conn->getLastAccessed());
+ std::chrono::steady_clock::now() - conn->getLastAccessed());
if (nextIdle > std::chrono::seconds::zero() && nextIdle < _nextIdle) {
_nextIdle = nextIdle;
}
@@ -565,7 +600,7 @@
->getEndpointForNewFwdConn(
outEndpoint, additionalLoc, excludeServers,
m_attrs->m_serverGrp, currentServer)) {
- throw IllegalStateException("Locator query failed");
+ throw IllegalStateException("Locator query failed selecting an endpoint");
}
// Update Locator stats
getStats().setLocators(
@@ -639,11 +674,12 @@
ep = addEP(cs->value());
} else if (!ep->connected()) {
LOGFINE(
- "ThinClientPoolDM::sendRequestToAllServers server not connected %s ",
+ "ThinClientPoolDM::sendRequestToAllServers server not connected "
+ "%s ",
cs->value().c_str());
}
auto funcExe = std::make_shared<FunctionExecution>();
- funcExe->setParameters(func, getResult, timeout, args, ep, this,
+ funcExe->setParameters(func, getResult, timeout, args, ep.get(), this,
resultCollectorLock, &rs, userAttr);
fePtrList.push_back(funcExe);
threadPool.perform(funcExe);
@@ -821,13 +857,6 @@
m_isDestroyed = true;
LOGDEBUG("ThinClientPoolDM::destroy( ): after close m_isDestroyed = %d ",
m_isDestroyed);
-
- for (const auto& iter : m_endpoints) {
- auto ep = iter.second;
- LOGFINE("ThinClientPoolDM: forcing endpoint delete for %s in destructor",
- ep->name().c_str());
- _GEODE_SAFE_DELETE(ep);
- }
}
if (m_poolSize != 0) {
LOGFINE("Pool connection size is not zero %d", m_poolSize.load());
@@ -1048,7 +1077,8 @@
}
LOGDEBUG(
- "ThinClientPoolDM::sendUserCredentials: Error after sending cred request "
+ "ThinClientPoolDM::sendUserCredentials: Error after sending cred "
+ "request "
"= %d ",
err);
@@ -1061,7 +1091,8 @@
case TcrMessage::EXCEPTION: {
if (err == GF_NOERR && conn) {
putInQueue(
- conn, isBGThread); // connFound is only relevant for Sticky conn.
+ conn,
+ isBGThread); // connFound is only relevant for Sticky conn.
}
// this will set error type if there is some server exception
err = ThinClientRegion::handleServerException(
@@ -1073,7 +1104,8 @@
default: {
if (err == GF_NOERR && conn) {
putInQueue(
- conn, isBGThread); // connFound is only relevant for Sticky conn.
+ conn,
+ isBGThread); // connFound is only relevant for Sticky conn.
}
LOGERROR(
"Unknown message type %d during secure response, possible "
@@ -1118,18 +1150,17 @@
TcrEndpoint* ThinClientPoolDM::getEndPoint(
const std::shared_ptr<BucketServerLocation>& serverLocation, int8_t&,
std::set<ServerLocation>& excludeServers) {
- TcrEndpoint* ep = nullptr;
+ std::shared_ptr<TcrEndpoint> ep = nullptr;
if (serverLocation->isValid()) {
if (excludeServer(serverLocation->getEpString(), excludeServers)) {
LOGFINE("ThinClientPoolDM::getEndPoint Exclude Server true for %s ",
serverLocation->getEpString().c_str());
- return ep;
+ return ep.get();
}
ep = getEndpoint(serverLocation->getEpString());
if (ep) {
- LOGDEBUG("Endpoint for single hop is %s", ep->name().c_str());
- return ep;
+ return ep.get();
}
// do for pool with endpoints. Add endpoint into m_endpoints only when we
@@ -1168,14 +1199,15 @@
}
}
- return ep;
+ return ep.get();
}
-TcrEndpoint* ThinClientPoolDM::getEndpoint(const std::string& endpointName) {
+std::shared_ptr<TcrEndpoint> ThinClientPoolDM::getEndpoint(
+ const std::string& endpointName) {
m_endpoints.make_lock();
const auto& find = m_endpoints.find(endpointName);
if (find == m_endpoints.end()) {
- return nullptr;
+ return {};
}
return find->second;
}
@@ -1343,7 +1375,8 @@
if (region != nullptr) {
LOGFINE(
- "Need to refresh pr-meta-data timeout in client only with refresh "
+ "Need to refresh pr-meta-data timeout in client only with "
+ "refresh "
"metadata");
auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
tcrRegion->setMetaDataRefreshed(false);
@@ -1357,7 +1390,8 @@
"ThinClientPoolDM::sendSyncRequest: isUserNeedToReAuthenticate = %d ",
isUserNeedToReAuthenticate);
LOGDEBUG(
- "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = %p "
+ "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = "
+ "%p "
"type = %d",
m_isMultiUserMode, conn, type);
@@ -1425,7 +1459,10 @@
request.getMessageType() ==
TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP);
if (conn) {
- GF_SAFE_DELETE_CON(conn);
+ try {
+ GF_SAFE_DELETE_CON(conn);
+ } catch (...) {
+ }
}
excludeServers.insert(ServerLocation(ep->name()));
removeEPFromMetadataIfError(error, ep);
@@ -1447,7 +1484,8 @@
userAttr->unAuthenticateEP(ep);
}
LOGFINEST(
- "After getting AuthenticationRequiredException trying again.");
+ "After getting AuthenticationRequiredException trying "
+ "again.");
isAuthRequireExcepMaxTry--;
isAuthRequireExcep = true;
continue;
@@ -1471,8 +1509,8 @@
m_connManager.getCacheImpl()->getRegion(request.getRegionName());
if (region != nullptr) {
- if (!connFound) // max limit case then don't refresh otherwise always
- // refresh
+ if (!connFound) // max limit case then don't refresh otherwise
+ // always refresh
{
LOGFINE("Need to refresh pr-meta-data");
auto* tcrRegion = dynamic_cast<ThinClientRegion*>(region.get());
@@ -1566,7 +1604,8 @@
auto theEP = getEndpoint(epNameStr);
LOGFINE(
- "ThinClientPoolDM::getConnectionToAnEndPoint( ): Getting endpoint object "
+ "ThinClientPoolDM::getConnectionToAnEndPoint( ): Getting endpoint "
+ "object "
"for %s",
epNameStr.c_str());
if (theEP && theEP->connected()) {
@@ -1574,7 +1613,7 @@
"ThinClientPoolDM::getConnectionToAnEndPoint( ): Getting connection "
"for endpoint %s",
epNameStr.c_str());
- conn = getFromEP(theEP);
+ conn = getFromEP(theEP.get());
// if connection is null, possibly because there are no idle connections
// to this endpoint, create a new pool connection to this endpoint.
bool maxConnLimit = false;
@@ -1583,13 +1622,13 @@
"ThinClientPoolDM::getConnectionToAnEndPoint( ): Create connection "
"for endpoint %s",
epNameStr.c_str());
- error = createPoolConnectionToAEndPoint(conn, theEP, maxConnLimit);
+ error = createPoolConnectionToAEndPoint(conn, theEP.get(), maxConnLimit);
}
}
// if connection is null, it has failed to get a connection to the specified
- // endpoint. Get a connection to any other server and failover the transaction
- // to that server.
+ // endpoint. Get a connection to any other server and failover the
+ // transaction to that server.
if (!conn) {
std::set<ServerLocation> excludeServers;
bool maxConnLimit = false;
@@ -1609,7 +1648,8 @@
if (failoverErr != GF_NOERR) {
LOGFINE(
"ThinClientPoolDM::getConnectionToAnEndPoint( ):Failed to "
- "failover transaction to another server. From endpoint %s to %s",
+ "failover transaction to another server. From endpoint %s to "
+ "%s",
epNameStr.c_str(), conn->getEndpointObject()->name().c_str());
putInQueue(conn, false);
conn = nullptr;
@@ -1620,7 +1660,8 @@
if (!(conn && error == GF_NOERR)) {
LOGFINE(
- "ThinClientPoolDM::getConnectionToAEndPoint( ):Failed to connect to %s",
+ "ThinClientPoolDM::getConnectionToAEndPoint( ):Failed to connect to "
+ "%s",
epNameStr.c_str());
if (conn) {
_GEODE_SAFE_DELETE(conn);
@@ -1630,9 +1671,9 @@
return error;
}
-// Create a pool connection to specified endpoint. First checks if the number of
-// connections has exceeded the maximum allowed. If not, create a connection to
-// the specified endpoint. Else, throws an error.
+// Create a pool connection to specified endpoint. First checks if the number
+// of connections has exceeded the maximum allowed. If not, create a
+// connection to the specified endpoint. Else, throws an error.
GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint(
TcrConnection*& conn, TcrEndpoint* theEP, bool& maxConnLimit,
bool appThreadrequest) {
@@ -1652,7 +1693,8 @@
if (m_poolSize >= max) {
maxConnLimit = true;
LOGFINER(
- "ThinClientPoolDM::createPoolConnectionToAEndPoint( ): current pool "
+ "ThinClientPoolDM::createPoolConnectionToAEndPoint( ): current "
+ "pool "
"size has reached limit %d, %d",
m_poolSize.load(), max);
return error;
@@ -1695,6 +1737,7 @@
if (m_cliCallbackTask != nullptr) m_cliCallbackSema.release();
}
}
+
GfErrType ThinClientPoolDM::createPoolConnection(
TcrConnection*& conn, std::set<ServerLocation>& excludeServers,
bool& maxConnLimit, const TcrConnection* currentserver) {
@@ -1705,21 +1748,11 @@
}
int min = m_attrs->getMinConnections();
max = max > min ? max : min;
- LOGDEBUG(
- "ThinClientPoolDM::createPoolConnection( ): current pool size has "
- "reached limit %d, %d, %d",
- m_poolSize.load(), max, min);
conn = nullptr;
- {
- if (m_poolSize >= max) {
- LOGDEBUG(
- "ThinClientPoolDM::createPoolConnection( ): current pool size has "
- "reached limit %d, %d",
- m_poolSize.load(), max);
- maxConnLimit = true;
- return error;
- }
+ if (m_poolSize >= max) {
+ maxConnLimit = true;
+ return error;
}
bool fatal = false;
@@ -1730,7 +1763,7 @@
try {
epNameStr = selectEndpoint(excludeServers, currentserver);
} catch (const NoAvailableLocatorsException&) {
- LOGFINE("Locator query failed");
+ LOGFINE("Locator query failed while creating pool connection");
return GF_CACHE_LOCATOR_EXCEPTION;
} catch (const Exception&) {
LOGFINE("Endpoint selection failed");
@@ -1961,7 +1994,8 @@
ua->unAuthenticateEP(currentEndpoint);
}
LOGFINEST(
- "After getting AuthenticationRequiredException trying again.");
+ "After getting AuthenticationRequiredException trying "
+ "again.");
isAuthRequireExcepMaxTry--;
isAuthRequireExcep = true;
if (isAuthRequireExcepMaxTry >= 0) error = GF_NOERR;
@@ -1975,25 +2009,27 @@
return error;
}
-TcrEndpoint* ThinClientPoolDM::addEP(ServerLocation& serverLoc) {
- const auto endpointName =
- serverLoc.getServerName() + ":" + std::to_string(serverLoc.getPort());
- return addEP(endpointName);
+std::shared_ptr<TcrEndpoint> ThinClientPoolDM::addEP(
+ ServerLocation& serverLoc) {
+ return addEP(serverLoc.getServerName() + ":" +
+ std::to_string(serverLoc.getPort()));
}
-TcrEndpoint* ThinClientPoolDM::addEP(const std::string& endpointName) {
+std::shared_ptr<TcrEndpoint> ThinClientPoolDM::addEP(
+ const std::string& endpointName) {
std::lock_guard<decltype(m_endpointsLock)> guard(m_endpointsLock);
auto ep = getEndpoint(endpointName);
if (!ep) {
+ ep = createEP(endpointName.c_str());
LOGFINE("Created new endpoint %s for pool %s", endpointName.c_str(),
m_poolName.c_str());
- ep = createEP(endpointName.c_str());
if (!m_endpoints.emplace(endpointName, ep).second) {
LOGERROR("Failed to add endpoint %s to pool %s", endpointName.c_str(),
m_poolName.c_str());
}
}
+
// Update Server Stats
getStats().setServers(static_cast<int32_t>(m_endpoints.size()));
return ep;
@@ -2011,8 +2047,8 @@
if (endpoint->connected()) {
endpoint->pingServer(this);
if (!endpoint->connected()) {
- removeEPConnections(endpoint);
- removeCallbackConnection(endpoint);
+ removeEPConnections(endpoint.get());
+ removeCallbackConnection(endpoint.get());
}
}
}
@@ -2020,17 +2056,20 @@
void ThinClientPoolDM::updateLocatorList(std::atomic<bool>& isRunning) {
LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str());
+
while (isRunning) {
m_updateLocatorListSema.acquire();
if (isRunning && !m_connManager.isNetDown()) {
(m_locHelper)->updateLocators(getServerGroup());
}
}
+
LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str());
}
void ThinClientPoolDM::pingServer(std::atomic<bool>& isRunning) {
LOGFINE("Starting ping thread for pool %s", m_poolName.c_str());
+
while (isRunning) {
m_pingSema.acquire();
if (isRunning && !m_connManager.isNetDown()) {
@@ -2038,6 +2077,7 @@
m_pingSema.acquire();
}
}
+
LOGFINE("Ending ping thread for pool %s", m_poolName.c_str());
}
@@ -2383,8 +2423,9 @@
return m_connManager.checkDupAndAdd(eventid);
}
-TcrEndpoint* ThinClientPoolDM::createEP(const char* endpointName) {
- return new TcrPoolEndPoint(
+std::shared_ptr<TcrEndpoint> ThinClientPoolDM::createEP(
+ const char* endpointName) {
+ return std::make_shared<TcrPoolEndPoint>(
endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
m_connManager.m_cleanupSema, m_connManager.m_redundancySema, this);
}
diff --git a/cppcache/src/ThinClientPoolDM.hpp b/cppcache/src/ThinClientPoolDM.hpp
index 1c2d789..4e06399 100644
--- a/cppcache/src/ThinClientPoolDM.hpp
+++ b/cppcache/src/ThinClientPoolDM.hpp
@@ -102,9 +102,9 @@
TcrEndpoint* currentEndpoint) override;
void addConnection(TcrConnection* conn);
- TcrEndpoint* addEP(ServerLocation& serverLoc);
+ std::shared_ptr<TcrEndpoint> addEP(ServerLocation& serverLoc);
- TcrEndpoint* addEP(const std::string& endpointName);
+ std::shared_ptr<TcrEndpoint> addEP(const std::string& endpointName);
virtual void pingServer(std::atomic<bool>& isRunning);
virtual void updateLocatorList(std::atomic<bool>& isRunning);
virtual void cliCallback(std::atomic<bool>& isRunning);
@@ -186,8 +186,9 @@
protected:
ThinClientStickyManager* m_manager;
std::vector<std::string> m_canonicalHosts;
- synchronized_map<std::unordered_map<std::string, TcrEndpoint*>,
- std::recursive_mutex>
+ synchronized_map<
+ std::unordered_map<std::string, std::shared_ptr<TcrEndpoint>>,
+ std::recursive_mutex>
m_endpoints;
std::recursive_mutex m_endpointsLock;
std::recursive_mutex m_endpointSelectionLock;
@@ -245,7 +246,7 @@
bool& isServerException);
// get endpoint using the endpoint string
- TcrEndpoint* getEndpoint(const std::string& epNameStr);
+ std::shared_ptr<TcrEndpoint> getEndpoint(const std::string& epNameStr);
bool m_isSecurityOn;
bool m_isMultiUserMode;
@@ -274,7 +275,7 @@
const TcrConnection* currentServer = nullptr);
// TODO global - m_memId was volatile
std::unique_ptr<ClientProxyMembershipID> m_memId;
- virtual TcrEndpoint* createEP(const char* endpointName);
+ virtual std::shared_ptr<TcrEndpoint> createEP(const char* endpointName);
virtual void removeCallbackConnection(TcrEndpoint*) {}
bool excludeServer(std::string, std::set<ServerLocation>&);
diff --git a/cppcache/src/ThinClientPoolHADM.cpp b/cppcache/src/ThinClientPoolHADM.cpp
index f487bb7..7f162a8 100644
--- a/cppcache/src/ThinClientPoolHADM.cpp
+++ b/cppcache/src/ThinClientPoolHADM.cpp
@@ -312,8 +312,9 @@
}
}
-TcrEndpoint* ThinClientPoolHADM::createEP(const char* endpointName) {
- return new TcrPoolEndPoint(
+std::shared_ptr<TcrEndpoint> ThinClientPoolHADM::createEP(
+ const char* endpointName) {
+ return std::make_shared<TcrPoolEndPoint>(
endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema,
m_connManager.m_cleanupSema, m_redundancySema, this);
}
diff --git a/cppcache/src/ThinClientPoolHADM.hpp b/cppcache/src/ThinClientPoolHADM.hpp
index 97a332b..5bd45b6 100644
--- a/cppcache/src/ThinClientPoolHADM.hpp
+++ b/cppcache/src/ThinClientPoolHADM.hpp
@@ -115,7 +115,7 @@
ExpiryTaskManager::id_type m_servermonitorTaskId;
int checkRedundancy(const ACE_Time_Value&, const void*);
- TcrEndpoint* createEP(const char* endpointName) override;
+ std::shared_ptr<TcrEndpoint> createEP(const char* endpointName) override;
void removeCallbackConnection(TcrEndpoint*) override;
diff --git a/cppcache/src/ThinClientRedundancyManager.cpp b/cppcache/src/ThinClientRedundancyManager.cpp
index 8538da6..03ac354 100644
--- a/cppcache/src/ThinClientRedundancyManager.cpp
+++ b/cppcache/src/ThinClientRedundancyManager.cpp
@@ -232,12 +232,12 @@
outEndpoints = selectServers(howMany, exclEndPts);
for (std::list<ServerLocation>::iterator it = outEndpoints.begin();
it != outEndpoints.end(); it++) {
- TcrEndpoint* ep = m_poolHADM->addEP(*it);
+ auto ep = m_poolHADM->addEP(*it);
LOGDEBUG(
"ThinClientRedundancyManager::maintainRedundancyLevel(): Adding "
"endpoint %s to nonredundant list.",
ep->name().c_str());
- m_nonredundantEndpoints.push_back(ep);
+ m_nonredundantEndpoints.push_back(ep.get());
}
}
@@ -949,7 +949,7 @@
}
}
-synchronized_map<std::unordered_map<std::string, TcrEndpoint*>,
+synchronized_map<std::unordered_map<std::string, std::shared_ptr<TcrEndpoint>>,
std::recursive_mutex>&
ThinClientRedundancyManager::updateAndSelectEndpoints() {
// 38196 Fix: For durable clients reconnect
@@ -983,8 +983,7 @@
void ThinClientRedundancyManager::getAllEndpoints(
std::vector<TcrEndpoint*>& endpoints) {
- TcrEndpoint* maxQEp = nullptr;
- TcrEndpoint* primaryEp = nullptr;
+ std::shared_ptr<TcrEndpoint> maxQEp, primaryEp;
auto& selectedEndpoints = updateAndSelectEndpoints();
for (const auto& currItr : selectedEndpoints) {
@@ -998,13 +997,13 @@
m_poolHADM->addConnection(statusConn);
}
if (status == REDUNDANT_SERVER) {
- if (maxQEp == nullptr) {
+ if (!maxQEp) {
maxQEp = ep;
} else if (ep->getServerQueueSize() > maxQEp->getServerQueueSize()) {
- insertEPInQueueSizeOrder(maxQEp, endpoints);
+ insertEPInQueueSizeOrder(maxQEp.get(), endpoints);
maxQEp = ep;
} else {
- insertEPInQueueSizeOrder(ep, endpoints);
+ insertEPInQueueSizeOrder(ep.get(), endpoints);
}
LOGDEBUG(
"ThinClientRedundancyManager::getAllEndpoints(): sorting "
@@ -1016,33 +1015,33 @@
"ThinClientRedundancyManager::getAllEndpoints(): sorting "
"endpoints, found primary endpoint.");
} else {
- endpoints.push_back(currItr.second);
+ endpoints.push_back(currItr.second.get());
LOGDEBUG(
"ThinClientRedundancyManager::getAllEndpoints(): sorting "
"endpoints, found nonredundant endpoint.");
}
} else {
- endpoints.push_back(currItr.second);
+ endpoints.push_back(currItr.second.get());
}
//(*currItr)++;
}
// Add Endpoint with Max Queuesize at the last and Primary at first position
if (isDurable()) {
- if (maxQEp != nullptr) {
- endpoints.push_back(maxQEp);
+ if (maxQEp) {
+ endpoints.push_back(maxQEp.get());
LOGDEBUG(
"ThinClientRedundancyManager::getAllEndpoints(): sorting endpoints, "
"pushing max-q endpoint at back.");
}
- if (primaryEp != nullptr) {
- if (m_redundancyLevel == 0 || maxQEp == nullptr) {
- endpoints.push_back(primaryEp);
+ if (primaryEp) {
+ if (m_redundancyLevel == 0 || !maxQEp) {
+ endpoints.push_back(primaryEp.get());
LOGDEBUG(
"ThinClientRedundancyManager::getAllEndpoints(): sorting "
"endpoints, pushing primary at back.");
} else {
- endpoints.insert(endpoints.begin(), primaryEp);
+ endpoints.insert(endpoints.begin(), primaryEp.get());
LOGDEBUG(
"ThinClientRedundancyManager::getAllEndpoints(): sorting "
"endpoints, inserting primary at head.");
diff --git a/cppcache/src/ThinClientRedundancyManager.hpp b/cppcache/src/ThinClientRedundancyManager.hpp
index cc45395..c048e0d 100644
--- a/cppcache/src/ThinClientRedundancyManager.hpp
+++ b/cppcache/src/ThinClientRedundancyManager.hpp
@@ -118,8 +118,9 @@
void moveEndpointToLast(std::vector<TcrEndpoint*>& epVector,
TcrEndpoint* targetEp);
- synchronized_map<std::unordered_map<std::string, TcrEndpoint*>,
- std::recursive_mutex>&
+ synchronized_map<
+ std::unordered_map<std::string, std::shared_ptr<TcrEndpoint>>,
+ std::recursive_mutex>&
updateAndSelectEndpoints();
void getAllEndpoints(std::vector<TcrEndpoint*>& endpoints);
diff --git a/examples/cmake/FindGeodeNative.cmake.in b/examples/cmake/FindGeodeNative.cmake.in
index 906fec0..8c7dc47 100644
--- a/examples/cmake/FindGeodeNative.cmake.in
+++ b/examples/cmake/FindGeodeNative.cmake.in
@@ -130,7 +130,6 @@
IMPORTED_LOCATION "${@PRODUCT_NAME_NOSPACE@_CPP_LIBRARY}"
INTERFACE_INCLUDE_DIRECTORIES "${@PRODUCT_NAME_NOSPACE@_CPP_INCLUDE_DIR}")
endif()
-
set(@PRODUCT_NAME_NOSPACE@_DOTNET_TARGET "@PRODUCT_NAME_NOSPACE@::dotnet")
if(NOT TARGET ${@PRODUCT_NAME_NOSPACE@_DOTNET_TARGET})
add_library(${@PRODUCT_NAME_NOSPACE@_DOTNET_TARGET} UNKNOWN IMPORTED)
diff --git a/examples/cpp/sslputget/CMakeLists.txt.in b/examples/cpp/sslputget/CMakeLists.txt.in
index d84fa79..1f160f9 100644
--- a/examples/cpp/sslputget/CMakeLists.txt.in
+++ b/examples/cpp/sslputget/CMakeLists.txt.in
@@ -20,7 +20,7 @@
set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/../../cmake)
set(CMAKE_CXX_STANDARD 11)
-find_package(@PRODUCT_NAME_NOSPACE@ REQUIRED COMPONENTS cpp crypto)
+find_package(@PRODUCT_NAME_NOSPACE@ REQUIRED COMPONENTS cpp)
add_executable(${PROJECT_NAME} main.cpp)
diff --git a/examples/dotnet/sslputget/CMakeLists.txt.in b/examples/dotnet/sslputget/CMakeLists.txt.in
index e782e43..990e735 100644
--- a/examples/dotnet/sslputget/CMakeLists.txt.in
+++ b/examples/dotnet/sslputget/CMakeLists.txt.in
@@ -19,7 +19,7 @@
set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/../../cmake)
-find_package(@PRODUCT_NAME_NOSPACE@ REQUIRED COMPONENTS dotnet crypto)
+find_package(@PRODUCT_NAME_NOSPACE@ REQUIRED COMPONENTS dotnet)
add_executable(${PROJECT_NAME} Program.cs)
diff --git a/tests/cpp/fwklib/TcpIpc.cpp b/tests/cpp/fwklib/TcpIpc.cpp
index 128966b..aadaa98 100644
--- a/tests/cpp/fwklib/TcpIpc.cpp
+++ b/tests/cpp/fwklib/TcpIpc.cpp
@@ -106,21 +106,6 @@
ACE_OS::signal(SIGPIPE, SIG_IGN); // Ignore broken pipe
}
-bool TcpIpc::listen(int32_t waitSecs) {
- if (m_ipaddr.empty()) {
- FWKSEVERE("Listen failed, address not set.");
- return false;
- }
- ACE_INET_Addr addr(m_ipaddr.c_str());
- ACE_SOCK_Acceptor listener(addr, 1);
-
- if (listener.accept(*m_io, nullptr, new ACE_Time_Value(waitSecs)) != 0) {
- FWKSEVERE("Accept failed with errno: " << errno);
- return false;
- }
- return true;
-}
-
bool TcpIpc::accept(ACE_SOCK_Acceptor *acceptor, int32_t waitSecs) {
if (acceptor->accept(*m_io, nullptr, new ACE_Time_Value(waitSecs)) != 0) {
FWKSEVERE("Accept failed with errno: " << errno);
diff --git a/tests/cpp/fwklib/TcpIpc.hpp b/tests/cpp/fwklib/TcpIpc.hpp
index affea43..d714d43 100644
--- a/tests/cpp/fwklib/TcpIpc.hpp
+++ b/tests/cpp/fwklib/TcpIpc.hpp
@@ -58,7 +58,6 @@
void close();
- bool listen(int32_t waitSecs = 0);
bool accept(ACE_SOCK_Acceptor* acceptor, int32_t waitSecs = 0);
bool connect(int32_t waitSecs = 0);