GEODE-8693: throw NotConnectedException in certain situation (#690)

* throw NotConnectedException if server goes down while execute with onServers
diff --git a/cppcache/integration/test/FunctionExecutionTest.cpp b/cppcache/integration/test/FunctionExecutionTest.cpp
index 0a28b14..f1cc760 100644
--- a/cppcache/integration/test/FunctionExecutionTest.cpp
+++ b/cppcache/integration/test/FunctionExecutionTest.cpp
@@ -183,7 +183,9 @@
     for (decltype(resultArray->size()) i = 0; i < resultArray->size(); i++) {
       auto value =
           std::dynamic_pointer_cast<CacheableString>(resultArray->at(i));
-      resultList.push_back(value->toString());
+      if (value != nullptr) {
+        resultList.push_back(value->toString());
+      }
     }
   }
 
@@ -251,3 +253,79 @@
               resultList[i + ON_SERVERS_TEST_REGION_ENTRIES_SIZE / 2]);
   }
 }
+
+void executeTestFunctionOnLoopAndExpectNotConnectedException(
+    std::shared_ptr<apache::geode::client::Pool> pool) {
+  // Filter on odd keys
+  auto routingObj = CacheableVector::create();
+  for (int i = 0; i < ON_SERVERS_TEST_REGION_ENTRIES_SIZE; i++) {
+    if (i % 2 == 0) {
+      continue;
+    }
+    routingObj->push_back(CacheableString::create("KEY--" + std::to_string(i)));
+  }
+
+  auto execution = FunctionService::onServers(pool);
+
+  ASSERT_THROW(
+      {
+        while (true) {
+          // This call must eventually throw the NotConnectedException
+          auto rc =
+              execution.withArgs(routingObj).execute("MultiGetFunctionISlow");
+
+          auto executeFunctionResult = rc->getResult();
+
+          auto resultList = serverResultsToStrings(executeFunctionResult);
+
+          // Executed on 2 servers, we should have two sets of results
+          ASSERT_EQ(resultList.size(), ON_SERVERS_TEST_REGION_ENTRIES_SIZE);
+        }
+      },
+      apache::geode::client::NotConnectedException);
+}
+
+TEST(FunctionExecutionTest, OnServersOneServerGoesDown) {
+  Cluster cluster{
+      LocatorCount{1}, ServerCount{2},
+      CacheXMLFiles(
+          {std::string(getFrameworkString(FrameworkVariable::TestCacheXmlDir)) +
+               "/func_cacheserver1_pool.xml",
+           std::string(getFrameworkString(FrameworkVariable::TestCacheXmlDir)) +
+               "/func_cacheserver2_pool.xml"})};
+
+  cluster.start([&]() {
+    cluster.getGfsh()
+        .deploy()
+        .jar(getFrameworkString(FrameworkVariable::JavaObjectJarPath))
+        .execute();
+  });
+
+  auto cache = CacheFactory().set("log-level", "debug").create();
+  auto poolFactory = cache.getPoolManager().createFactory();
+
+  cluster.applyLocators(poolFactory);
+
+  auto pool =
+      poolFactory.setLoadConditioningInterval(std::chrono::milliseconds::zero())
+          .setIdleTimeout(std::chrono::milliseconds::zero())
+          .create("pool");
+
+  auto region = cache.createRegionFactory(RegionShortcut::PROXY)
+                    .setPoolName("pool")
+                    .create("partition_region");
+
+  for (int i = 0; i < ON_SERVERS_TEST_REGION_ENTRIES_SIZE; i++) {
+    region->put("KEY--" + std::to_string(i), "VALUE--" + std::to_string(i));
+  }
+
+  auto threadAux = std::make_shared<std::thread>(
+      executeTestFunctionOnLoopAndExpectNotConnectedException, pool);
+
+  // Sleep a bit to allow for some successful responses before the exception
+  std::this_thread::sleep_for(std::chrono::seconds(5));
+
+  cluster.getServers()[1].stop();
+
+  threadAux->join();
+}
diff --git a/cppcache/src/CqQueryImpl.cpp b/cppcache/src/CqQueryImpl.cpp
index 4b1f26d..eec3da4 100644
--- a/cppcache/src/CqQueryImpl.cpp
+++ b/cppcache/src/CqQueryImpl.cpp
@@ -385,7 +385,7 @@
   GfErrType err = GF_NOERR;
   err = m_tccdm->sendSyncRequest(msg, reply);
   if (err != GF_NOERR) {
-    LOGDEBUG("CqQueryImpl::executeCqWithInitialResults errorred!!!!");
+    LOGDEBUG("CqQueryImpl::executeCqWithInitialResults failed!!!!");
     throwExceptionIfError("CqQuery::executeCqWithInitialResults:", err);
   }
   if (reply.getMessageType() == TcrMessage::EXCEPTION ||
diff --git a/cppcache/src/ExecutionImpl.cpp b/cppcache/src/ExecutionImpl.cpp
index c805a8e..cda2658 100644
--- a/cppcache/src/ExecutionImpl.cpp
+++ b/cppcache/src/ExecutionImpl.cpp
@@ -429,33 +429,20 @@
   std::shared_ptr<CacheableString> exceptionPtr = nullptr;
   GfErrType err = tcrdm->sendRequestToAllServers(
       func.c_str(), getResult, timeout, m_args, m_rc, exceptionPtr);
-  if (exceptionPtr != nullptr && err != GF_NOERR) {
-    LOGDEBUG("Execute errorred: %d", err);
-    // throw FunctionExecutionException( "Execute: failed to execute function
-    // with server." );
-    if (err == GF_CACHESERVER_EXCEPTION) {
-      throw FunctionExecutionException(
-          "Execute: failed to execute function with server.");
-    } else {
-      throwExceptionIfError("Execute", err);
-    }
-  }
-
-  if (err == GF_AUTHENTICATION_FAILED_EXCEPTION ||
-      err == GF_NOT_AUTHORIZED_EXCEPTION ||
-      err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
-    throwExceptionIfError("Execute", err);
-  }
 
   if (err != GF_NOERR) {
+    LOGDEBUG("Execute failed: %d", err);
     if (err == GF_CACHESERVER_EXCEPTION) {
-      auto message = std::string("Execute: exception at the server side: ") +
-                     exceptionPtr->value().c_str();
+      std::string message;
+      if (exceptionPtr) {
+        message = std::string("Execute: exception at the server side: ") +
+                  exceptionPtr->value().c_str();
+      } else {
+        message = "Execute: failed to execute function with server.";
+      }
       throw FunctionExecutionException(message);
     } else {
-      LOGDEBUG("Execute errorred with server exception: %d", err);
-      throw FunctionExecutionException(
-          "Execute: failed to execute function on servers.");
+      throwExceptionIfError("Execute", err);
     }
   }
 }
diff --git a/cppcache/src/ThinClientPoolDM.cpp b/cppcache/src/ThinClientPoolDM.cpp
index a828a5b..d8b23b4 100644
--- a/cppcache/src/ThinClientPoolDM.cpp
+++ b/cppcache/src/ThinClientPoolDM.cpp
@@ -671,11 +671,12 @@
         err == GF_NOT_AUTHORIZED_EXCEPTION ||
         err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) {
       finalErrorReturn = err;
-    } else if (!(finalErrorReturn == GF_AUTHENTICATION_FAILED_EXCEPTION ||
-                 finalErrorReturn == GF_NOT_AUTHORIZED_EXCEPTION ||
-                 finalErrorReturn ==
-                     GF_AUTHENTICATION_REQUIRED_EXCEPTION))  // returning auth
-                                                             // errors
+    } else if ((err != GF_NOERR) &&
+               (!(finalErrorReturn == GF_AUTHENTICATION_FAILED_EXCEPTION ||
+                  finalErrorReturn == GF_NOT_AUTHORIZED_EXCEPTION ||
+                  finalErrorReturn ==
+                      GF_AUTHENTICATION_REQUIRED_EXCEPTION)))  // returning auth
+                                                               // errors
     // to client..preference
     // over other errors..
     {
@@ -2415,7 +2416,7 @@
   m_error = m_poolDM->sendRequestToEP(request, reply, m_ep);
   m_error = m_poolDM->handleEPError(m_ep, reply, m_error);
   if (m_error != GF_NOERR) {
-    if (m_error == GF_NOTCON || m_error == GF_IOERR) {
+    if (m_error == GF_NOTCON) {
       return GF_NOERR;  // if server is unavailable its not an error for
       // functionexec OnServers() case
     }
diff --git a/tests/javaobject/MultiGetFunctionISlow.java b/tests/javaobject/MultiGetFunctionISlow.java
new file mode 100755
index 0000000..0d33d05
--- /dev/null
+++ b/tests/javaobject/MultiGetFunctionISlow.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package javaobject;
+
+import org.apache.geode.cache.execute.FunctionContext;
+
+public class MultiGetFunctionISlow extends MultiGetFunctionI {
+
+  public void execute(FunctionContext context) {
+    try {
+      Thread.sleep(4000);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+    super.execute(context);
+  }
+
+  @Override
+  public String getId() {
+    return "MultiGetFunctionISlow";
+  }
+}