GEODE-8693: throw NotConnectedException in certain situation (#690)
* throw NotConnectedException if server goes down while execute with onServers
diff --git a/cppcache/integration/test/FunctionExecutionTest.cpp b/cppcache/integration/test/FunctionExecutionTest.cpp
index 0a28b14..f1cc760 100644
--- a/cppcache/integration/test/FunctionExecutionTest.cpp
+++ b/cppcache/integration/test/FunctionExecutionTest.cpp
@@ -183,7 +183,9 @@
for (decltype(resultArray->size()) i = 0; i < resultArray->size(); i++) {
auto value =
std::dynamic_pointer_cast<CacheableString>(resultArray->at(i));
- resultList.push_back(value->toString());
+ if (value != nullptr) {
+ resultList.push_back(value->toString());
+ }
}
}
@@ -251,3 +253,79 @@
resultList[i + ON_SERVERS_TEST_REGION_ENTRIES_SIZE / 2]);
}
}
+
+void executeTestFunctionOnLoopAndExpectNotConnectedException(
+ std::shared_ptr<apache::geode::client::Pool> pool) {
+ // Filter on odd keys
+ auto routingObj = CacheableVector::create();
+ for (int i = 0; i < ON_SERVERS_TEST_REGION_ENTRIES_SIZE; i++) {
+ if (i % 2 == 0) {
+ continue;
+ }
+ routingObj->push_back(CacheableString::create("KEY--" + std::to_string(i)));
+ }
+
+ auto execution = FunctionService::onServers(pool);
+
+ ASSERT_THROW(
+ {
+ while (true) {
+ // This call must eventually throw the NotConnectedException
+ auto rc =
+ execution.withArgs(routingObj).execute("MultiGetFunctionISlow");
+
+ auto executeFunctionResult = rc->getResult();
+
+ auto resultList = serverResultsToStrings(executeFunctionResult);
+
+ // Executed on 2 servers, we should have two sets of results
+ ASSERT_EQ(resultList.size(), ON_SERVERS_TEST_REGION_ENTRIES_SIZE);
+ }
+ },
+ apache::geode::client::NotConnectedException);
+}
+
+TEST(FunctionExecutionTest, OnServersOneServerGoesDown) {
+ Cluster cluster{
+ LocatorCount{1}, ServerCount{2},
+ CacheXMLFiles(
+ {std::string(getFrameworkString(FrameworkVariable::TestCacheXmlDir)) +
+ "/func_cacheserver1_pool.xml",
+ std::string(getFrameworkString(FrameworkVariable::TestCacheXmlDir)) +
+ "/func_cacheserver2_pool.xml"})};
+
+ cluster.start([&]() {
+ cluster.getGfsh()
+ .deploy()
+ .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath))
+ .execute();
+ });
+
+ auto cache = CacheFactory().set("log-level", "debug").create();
+ auto poolFactory = cache.getPoolManager().createFactory();
+
+ cluster.applyLocators(poolFactory);
+
+ auto pool =
+ poolFactory.setLoadConditioningInterval(std::chrono::milliseconds::zero())
+ .setIdleTimeout(std::chrono::milliseconds::zero())
+ .create("pool");
+
+ auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+ .setPoolName("pool")
+ .create("partition_region");
+
+ for (int i = 0; i < ON_SERVERS_TEST_REGION_ENTRIES_SIZE; i++) {
+ region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i));
+ }
+
+ auto threadAux = std::make_shared<std::thread>(
+ executeTestFunctionOnLoopAndExpectNotConnectedException, pool);
+
+ // Sleep a bit to allow for some successful responses before the exception
+ std::this_thread::sleep_for(std::chrono::seconds(5));
+
+ cluster.getServers()[1].stop();
+
+ threadAux->join();
+}
diff --git a/cppcache/src/CqQueryImpl.cpp b/cppcache/src/CqQueryImpl.cpp
index 4b1f26d..eec3da4 100644
--- a/cppcache/src/CqQueryImpl.cpp
+++ b/cppcache/src/CqQueryImpl.cpp
@@ -385,7 +385,7 @@
GfErrType err = GF_NOERR;
err = m_tccdm->sendSyncRequest(msg, reply);
if (err != GF_NOERR) {
- LOGDEBUG("CqQueryImpl::executeCqWithInitialResults errorred!!!!");
+ LOGDEBUG("CqQueryImpl::executeCqWithInitialResults failed!!!!");
throwExceptionIfError("CqQuery::executeCqWithInitialResults:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
diff --git a/cppcache/src/ExecutionImpl.cpp b/cppcache/src/ExecutionImpl.cpp
index c805a8e..cda2658 100644
--- a/cppcache/src/ExecutionImpl.cpp
+++ b/cppcache/src/ExecutionImpl.cpp
@@ -429,33 +429,20 @@
std::shared_ptr<CacheableString> exceptionPtr = nullptr;
GfErrType err = tcrdm->sendRequestToAllServers(
func.c_str(), getResult, timeout, m_args, m_rc, exceptionPtr);
- if (exceptionPtr != nullptr && err != GF_NOERR) {
- LOGDEBUG("Execute errorred: %d", err);
- // throw FunctionExecutionException( "Execute: failed to execute function
- // with server." );
- if (err == GF_CACHESERVER_EXCEPTION) {
- throw FunctionExecutionException(
- "Execute: failed to execute function with server.");
- } else {
- throwExceptionIfError("Execute", err);
- }
- }
-
- if (err == GF_AUTHENTICATION_FAILED_EXCEPTION ||
- err == GF_NOT_AUTHORIZED_EXCEPTION ||
- err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
- throwExceptionIfError("Execute", err);
- }
if (err != GF_NOERR) {
+ LOGDEBUG("Execute failed: %d", err);
if (err == GF_CACHESERVER_EXCEPTION) {
- auto message = std::string("Execute: exception at the server side: ") +
- exceptionPtr->value().c_str();
+ std::string message;
+ if (exceptionPtr) {
+ message = std::string("Execute: exception at the server side: ") +
+ exceptionPtr->value().c_str();
+ } else {
+ message = "Execute: failed to execute function with server.";
+ }
throw FunctionExecutionException(message);
} else {
- LOGDEBUG("Execute errorred with server exception: %d", err);
- throw FunctionExecutionException(
- "Execute: failed to execute function on servers.");
+ throwExceptionIfError("Execute", err);
}
}
}
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index a828a5b..d8b23b4 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -671,11 +671,12 @@
err == GF_NOT_AUTHORIZED_EXCEPTION ||
err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
finalErrorReturn = err;
- } else if (!(finalErrorReturn == GF_AUTHENTICATION_FAILED_EXCEPTION ||
- finalErrorReturn == GF_NOT_AUTHORIZED_EXCEPTION ||
- finalErrorReturn ==
- GF_AUTHENTICATION_REQUIRED_EXCEPTION)) // returning auth
- // errors
+ } else if ((err != GF_NOERR) &&
+ (!(finalErrorReturn == GF_AUTHENTICATION_FAILED_EXCEPTION ||
+ finalErrorReturn == GF_NOT_AUTHORIZED_EXCEPTION ||
+ finalErrorReturn ==
+ GF_AUTHENTICATION_REQUIRED_EXCEPTION))) // returning auth
+ // errors
// to client..preference
// over other errors..
{
@@ -2415,7 +2416,7 @@
m_error = m_poolDM->sendRequestToEP(request, reply, m_ep);
m_error = m_poolDM->handleEPError(m_ep, reply, m_error);
if (m_error != GF_NOERR) {
- if (m_error == GF_NOTCON || m_error == GF_IOERR) {
+ if (m_error == GF_NOTCON) {
return GF_NOERR; // if server is unavailable its not an error for
// functionexec OnServers() case
}
diff --git a/tests/javaobject/MultiGetFunctionISlow.java b/tests/javaobject/MultiGetFunctionISlow.java
new file mode 100755
index 0000000..0d33d05
--- /dev/null
+++ b/tests/javaobject/MultiGetFunctionISlow.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package javaobject;
+
+import org.apache.geode.cache.execute.FunctionContext;
+
+public class MultiGetFunctionISlow extends MultiGetFunctionI {
+
+ public void execute(FunctionContext context) {
+ try {
+ Thread.sleep(4000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ super.execute(context);
+ }
+
+ @Override
+ public String getId() {
+ return "MultiGetFunctionISlow";
+ }
+}