Feature/geode 7258 2 (#4250)

GEODE-7258: The function retry logic is modified to handle exception
thrown, while trying to connect to a server thats shutdown/closed.

    Co-authored-by: Anil <agingade@pivotal.io>
    Co-authored-by: Xiaojian Zhou <gzhou@pivotal.io>
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index 35837ac..77cc175 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -30,6 +30,7 @@
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.FixedPartitionResolver;
@@ -493,6 +494,10 @@
     return bucketId;
   }
 
+  @VisibleForTesting
+  public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive) {
+    scheduleGetPRMetaData((InternalRegion) region, isRecursive);
+  }
 
   public void scheduleGetPRMetaData(final InternalRegion region, final boolean isRecursive) {
     if (this.nonPRs.contains(region.getFullPath())) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
index b920343..e16d6cd 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
@@ -24,7 +24,6 @@
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.execute.Function;
@@ -60,6 +59,8 @@
   /** index of ignoreFailedMembers in flags[] */
   public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   private ExecuteFunctionOp() {
     // no instances allowed
   }
@@ -83,8 +84,8 @@
     } else {
 
       boolean reexecute = false;
+      int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
 
-      int maxRetryAttempts = pool.getRetryAttempts();
       if (!isHA) {
         maxRetryAttempts = 0;
       }
@@ -107,11 +108,8 @@
 
         } catch (ServerConnectivityException se) {
 
-          if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-            // If the retryAttempt is set to default(-1). Try it on all servers once.
-            // Calculating number of servers when function is re-executed as it involves
-            // messaging locator.
-            maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+          if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+            maxRetryAttempts = pool.calculateRetryAttempts(se);
           }
 
           if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index f4d7520..c40df1c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -27,7 +27,6 @@
 import org.apache.geode.InternalGemFireError;
 import org.apache.geode.cache.CacheClosedException;
 import org.apache.geode.cache.client.NoAvailableServersException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
@@ -60,6 +59,8 @@
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   private ExecuteRegionFunctionOp() {
     // no instances allowed
   }
@@ -67,17 +68,14 @@
   /**
    * Does a execute Function on a server using connections from the given pool to communicate with
    * the server.
-   *
-   * @param pool the pool to use to communicate with the server.
-   * @param resultCollector is used to collect the results from the Server
-   * @param maxRetryAttempts Maximum number of retry attempts
    */
   static void execute(ExecutablePool pool,
       ResultCollector resultCollector,
-      int maxRetryAttempts, boolean isHA,
+      int retryAttempts, boolean isHA,
       ExecuteRegionFunctionOpImpl op, boolean isReexecute,
       Set<String> failedNodes) {
 
+    int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : MAX_RETRY_INITIAL_VALUE;
     if (!isHA) {
       maxRetryAttempts = 0;
     }
@@ -107,11 +105,8 @@
         throw failedException;
       } catch (ServerConnectivityException se) {
 
-        if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-          // If the retryAttempt is set to default(-1). Try it on all servers once.
-          // Calculating number of servers when function is re-executed as it involves
-          // messaging locator.
-          maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
+        if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+          maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se);
         }
 
         if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index f41a829..2d5abf1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -66,7 +66,6 @@
       ServerRegionFunctionExecutor serverRegionExecutor,
       ResultCollector resultCollector,
       Map<ServerLocation, ? extends HashSet> serverToFilterMap,
-      int mRetryAttempts,
       boolean isHA,
       final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction,
       final Supplier<AbstractOp> executeRegionFunctionOpSupplier) {
@@ -87,7 +86,7 @@
 
     final int retryAttempts =
         SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA,
-            resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool));
+            resultCollector, failedNodes, ((PoolImpl) pool));
 
     if (isDebugEnabled) {
       logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index fbb7d8a..cc0d5d5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -41,6 +41,7 @@
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionService;
 import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.SubscriptionNotEnabledException;
 import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
@@ -1581,4 +1582,27 @@
   public int getSubscriptionTimeoutMultiplier() {
     return subscriptionTimeoutMultiplier;
   }
+
+  public int calculateRetryAttempts(Throwable cause) {
+
+    int maxRetryAttempts = getRetryAttempts();
+
+    if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
+      // If the retryAttempt is set to default(-1). Try executing on all servers once.
+      // As calculating number of servers involves sending message to locator, it is
+      // done only when there is an exception.
+      if (cause instanceof ServerConnectivityException
+          && cause.getMessage().contains(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG)) {
+        // The request was sent once.
+        maxRetryAttempts = getConnectionSource().getAllServers().size() - 1;
+      } else {
+        // The client was unable to establish a connection before sending the
+        // request.
+        maxRetryAttempts = getConnectionSource().getAllServers().size();
+      }
+    }
+
+    return maxRetryAttempts;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index dd658ea..4ea852c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -699,7 +699,7 @@
                     hasResult, emptySet(), true, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor,
-                resultCollector, serverToBuckets, retryAttempts, function.isHA(),
+                resultCollector, serverToBuckets, function.isHA(),
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         } else {
@@ -725,7 +725,7 @@
                     hasResult, emptySet(), isBucketFilter, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 function.isHA(), regionFunctionSingleHopOpFunction,
                 executeRegionFunctionOpSupplier);
           }
@@ -786,7 +786,7 @@
                     emptySet(), true, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA,
+                serverRegionExecutor, resultCollector, serverToBuckets, isHA,
                 regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
 
@@ -810,7 +810,7 @@
                     emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs);
 
             ExecuteRegionFunctionSingleHopOp.execute(pool, region,
-                serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+                serverRegionExecutor, resultCollector, serverToFilterMap,
                 isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
index e5050eb..3799d5d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
@@ -30,7 +30,6 @@
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.internal.MakeNotStatic;
 import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.client.PoolFactory;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
 import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl;
@@ -50,6 +49,8 @@
 
   private static final Logger logger = LogService.getLogger();
 
+  private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
   @MakeNotStatic
   static final ExecutorService execService =
       LoggingExecutors.newCachedThreadPool("Function Execution Thread-", true);
@@ -89,11 +90,10 @@
 
   static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA,
       ResultCollector rc, Set<String> failedNodes,
-      final int retryAttemptsArg,
       final PoolImpl pool) {
 
-    ClientMetadataService cms = region.getCache().getClientMetadataService();
-    int maxRetryAttempts = 0;
+    ClientMetadataService cms;
+    int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
 
     if (callableTasks != null && !callableTasks.isEmpty()) {
       List futures = null;
@@ -120,15 +120,8 @@
             throw new InternalGemFireException(e.getMessage());
           } catch (ExecutionException ee) {
 
-            if (maxRetryAttempts == 0) {
-              maxRetryAttempts = retryAttemptsArg;
-            }
-
-            if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
-              // If the retryAttempt is set to default(-1). Try it on all servers once.
-              // Calculating number of servers when function is re-executed as it involves
-              // messaging locator.
-              maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+            if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+              maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause());
             }
 
             if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index 438980c..52e1684 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -70,6 +70,10 @@
 public class ConnectionManagerImpl implements ConnectionManager {
   private static final Logger logger = LogService.getLogger();
   private static final int NOT_WAITING = -1;
+  public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server ";
+  public static final String UNEXPECTED_SOCKET_CLOSED_MSG =
+      "Pool unexpected closed socket on server";
+  public static final String SOCKET_TIME_OUT_MSG = "socket timed out on client";
 
   private final String poolName;
   private final PoolStats poolStats;
@@ -321,8 +325,7 @@
       return connection;
     }
 
-    throw new ServerConnectivityException(
-        "Could not create a new connection to server " + server);
+    throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
   }
 
   @Override
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
index ae9ae13..bb11a04 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
@@ -228,9 +228,8 @@
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
-
-    when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts);
+            failureMode),
+        retryAttempts);
 
     args = null;
     memberMappedArg = mock(MemberMappedArgument.class);
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
index c91816f..7b8bd10 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
@@ -15,6 +15,7 @@
 package org.apache.geode.cache.client.internal;
 
 import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -29,6 +30,7 @@
 import org.apache.geode.cache.client.NoAvailableServersException;
 import org.apache.geode.cache.client.ServerConnectivityException;
 import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.internal.ServerLocation;
@@ -91,8 +93,6 @@
    * This method has to be {@code static} because it is called before
    * {@link ExecuteFunctionTestSupport} is constructed.
    *
-   * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()}
-   *        methods on {@link PoolImpl}
    * @param failureMode is the {@link FailureMode} that determines the kind of exception
    *        to {@code throw}
    */
@@ -118,7 +118,8 @@
         whenPoolExecute.thenReturn(null);
         break;
       case THROW_SERVER_CONNECTIVITY_EXCEPTION:
-        whenPoolExecute.thenThrow(new ServerConnectivityException("testing"));
+        whenPoolExecute.thenThrow(new ServerConnectivityException(
+            ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
         break;
       case THROW_SERVER_OPERATION_EXCEPTION:
         whenPoolExecute.thenThrow(new ServerOperationException("testing"));
@@ -138,7 +139,7 @@
          * we throw this exception first, then we throw ServerConnectivityException
          */
         whenPoolExecute.thenThrow(new InternalFunctionInvocationTargetException("testing"))
-            .thenThrow(new ServerConnectivityException("testing"));
+            .thenThrow(new ServerConnectivityException(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
         break;
       default:
         throw new AssertionError("unknown FailureMode type: " + failureMode);
@@ -149,7 +150,7 @@
   ExecuteFunctionTestSupport(
       final HAStatus haStatus,
       final FailureMode failureMode,
-      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) {
+      final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) {
 
     final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class);
     when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS);
@@ -174,6 +175,9 @@
 
     executablePool = mock(PoolImpl.class);
     when(executablePool.getConnectionSource()).thenReturn(connectionSource);
+    when(executablePool.getRetryAttempts()).thenReturn(retryAttempts);
+    when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class)))
+        .thenCallRealMethod();
 
     addPoolMockBehavior.accept(executablePool, failureMode);
   }
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
index 74f6748..60b464e 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
@@ -41,6 +41,7 @@
 import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.FailureMode;
 import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.FunctionIdentifierType;
 import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -284,7 +285,8 @@
               when.thenReturn(null);
               break;
             case THROW_SERVER_CONNECTIVITY_EXCEPTION:
-              when.thenThrow(new ServerConnectivityException("testing"));
+              when.thenThrow(new ServerConnectivityException(
+                  ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
               break;
             case THROW_SERVER_OPERATION_EXCEPTION:
               when.thenThrow(new ServerOperationException("testing"));
@@ -304,12 +306,13 @@
                * we throw this exception first, then we throw ServerConnectivityException
                */
               when.thenThrow(new InternalFunctionInvocationTargetException("testing"))
-                  .thenThrow(new ServerConnectivityException("testing"));
+                  .thenThrow(
+                      new ServerConnectivityException(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
               break;
             default:
               throw new AssertionError("unknown FailureMode type: " + failureMode);
           }
-        });
+        }, retryAttempts);
 
     executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -325,7 +328,8 @@
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
             .execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
index aef00fb..5649b1b 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
@@ -137,7 +137,7 @@
   }
 
   private void createMocks(final HAStatus haStatus,
-      final FailureMode failureModeArg) {
+      final FailureMode failureModeArg, Integer retryAttempts) {
 
     testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
         (pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(
@@ -146,7 +146,8 @@
                 ArgumentMatchers.any(),
                 ArgumentMatchers.anyBoolean(),
                 ArgumentMatchers.anyBoolean())),
-            failureMode));
+            failureMode),
+        retryAttempts);
 
     serverToFilterMap = new HashMap<>();
     serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>());
@@ -158,7 +159,7 @@
       final int retryAttempts, final int expectTries,
       final FailureMode failureMode) {
 
-    createMocks(haStatus, failureMode);
+    createMocks(haStatus, failureMode, retryAttempts);
 
     executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
         testSupport.getExecutablePool(),
@@ -182,7 +183,6 @@
             () -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute(
                 executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 testSupport.toBoolean(haStatus),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), FUNCTION_NAME,
@@ -199,7 +199,6 @@
         ignoreServerConnectivityException(
             () -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(),
                 executor, resultCollector, serverToFilterMap,
-                retryAttempts,
                 function.isHA(),
                 executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
                     testSupport.getRegion().getFullPath(), function,
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
new file mode 100644
index 0000000..901179f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.internal.cache.PoolManagerImpl;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SSLConfigurationFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class PoolImplTest {
+
+  @Test
+  public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureWithUnexpectedSocketClose() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage())
+        .thenReturn(ConnectionManagerImpl.UNEXPECTED_SOCKET_CLOSED_MSG);
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+  }
+
+  @Test
+  public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureDuringBorrowConnection() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage())
+        .thenReturn(ConnectionManagerImpl.BORROW_CONN_ERROR_MSG);
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+  }
+
+  @Test
+  public void calculateRetryAttemptsDecrementsRetryCountForFailureAfterSendingTheRequest() {
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage())
+        .thenReturn(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG);
+
+    PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(0);
+  }
+
+  @Test
+  public void calculateRetryAttemptsReturnsTheRetyCountConfiguredWithPool() {
+    int retryCount = 1;
+    List servers = mock(List.class);
+    when(servers.size()).thenReturn(1);
+    ConnectionSource connectionSource = mock(ConnectionSource.class);
+    when(connectionSource.getAllServers()).thenReturn(servers);
+    ServerConnectivityException serverConnectivityException =
+        mock(ServerConnectivityException.class);
+    when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception");
+
+    PoolImpl poolImpl = spy(getPool(retryCount));
+    when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+    assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(retryCount);
+  }
+
+  private PoolImpl getPool(int retryAttemptsAttribute) {
+    final DistributionConfig distributionConfig = mock(DistributionConfig.class);
+    doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
+        .getSecurableCommunicationChannels();
+
+    SSLConfigurationFactory.setDistributionConfig(distributionConfig);
+
+    final Properties properties = new Properties();
+    properties.put(DURABLE_CLIENT_ID, "1");
+
+    final Statistics statistics = mock(Statistics.class);
+
+    final PoolFactoryImpl.PoolAttributes poolAttributes =
+        mock(PoolFactoryImpl.PoolAttributes.class);
+
+    /*
+     * These are the minimum pool attributes required
+     * so that basic validation and setup completes successfully. The values of
+     * these attributes have no importance to the assertions of the test itself.
+     */
+    doReturn(1).when(poolAttributes).getMaxConnections();
+    doReturn((long) 10e8).when(poolAttributes).getPingInterval();
+    doReturn(retryAttemptsAttribute).when(poolAttributes).getRetryAttempts();
+
+    final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+    final InternalCache internalCache = mock(InternalCache.class);
+    doReturn(cancelCriterion).when(internalCache).getCancelCriterion();
+
+    final InternalDistributedSystem internalDistributedSystem =
+        mock(InternalDistributedSystem.class);
+    doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
+    doReturn(properties).when(internalDistributedSystem).getProperties();
+    doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());
+
+    final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
+    doReturn(true).when(poolManager).isNormal();
+
+    final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);
+
+    return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
+        internalDistributedSystem, internalCache, tMonitoring);
+  }
+
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index ac315bc..98d39d6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -14,67 +14,39 @@
  */
 package org.apache.geode.cache.lucene;
 
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.Assert.fail;
-import static org.junit.Assert.assertEquals;
 
 import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.logging.log4j.Logger;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
-import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.DistributedTestUtils;
 import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.Invoke;
 import org.apache.geode.test.dunit.NetworkUtils;
 import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.internal.DUnitLauncher;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
 import org.apache.geode.test.version.TestVersion;
 import org.apache.geode.test.version.VersionManager;
 
 @RunWith(Parameterized.class)
 @Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase {
+public abstract class LuceneSearchWithRollingUpgradeDUnit
+    extends LuceneSearchWithRollingUpgradeTestBase {
 
 
-  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}")
+  @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}")
   public static Collection<Object[]> data() {
     Collection<String> luceneVersions = getLuceneVersions();
     Collection<Object[]> rval = new ArrayList<>();
     luceneVersions.forEach(v -> {
-      rval.add(new Object[] {v, true});
-      rval.add(new Object[] {v, false});
+      rval.add(new Object[] {v, true, true});
+      rval.add(new Object[] {v, false, true});
     });
     return rval;
   }
@@ -84,6 +56,10 @@
     // Lucene Compatibility checks start with Apache Geode v1.2.0
     // Removing the versions older than v1.2.0
     result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0);
+
+    // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling
+    // upgrade for 1.10.0. The change was verified by rolling from develop to develop.
+    result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0);
     if (result.size() < 1) {
       throw new RuntimeException("No older versions of Geode were found to test against");
     } else {
@@ -92,15 +68,6 @@
     return result;
   }
 
-  private File[] testingDirs = new File[3];
-
-  protected static String INDEX_NAME = "index";
-
-  private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit";
-
-  // Each vm will have a cache object
-  protected static Object cache;
-
   // the old version of Geode we're testing against
   @Parameterized.Parameter()
   public String oldVersion;
@@ -108,137 +75,8 @@
   @Parameterized.Parameter(1)
   public Boolean reindex;
 
-  private void deleteVMFiles() {
-    System.out.println("deleting files in vm" + VM.getCurrentVMNum());
-    File pwd = new File(".");
-    for (File entry : pwd.listFiles()) {
-      try {
-        if (entry.isDirectory()) {
-          FileUtils.deleteDirectory(entry);
-        } else {
-          entry.delete();
-        }
-      } catch (Exception e) {
-        System.out.println("Could not delete " + entry + ": " + e.getMessage());
-      }
-    }
-  }
-
-  private void deleteWorkingDirFiles() {
-    Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
-  }
-
-  @Override
-  public void postSetUp() {
-    deleteWorkingDirFiles();
-    IgnoredException.addIgnoredException(
-        "cluster configuration service not available|ConflictingPersistentDataException");
-  }
-
-
-  Properties getLocatorPropertiesPre91(String locatorsString) {
-    Properties props = new Properties();
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
-    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
-    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
-    return props;
-  }
-
-  VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut,
-      String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) {
-    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled);
-    // recreate region on "rolled" client
-    invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
-    return rollClient;
-  }
-
-  private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts,
-      boolean subscriptionEnabled) {
-    oldClient.invoke(invokeCloseCache());
-    VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
-    rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
-        subscriptionEnabled));
-    rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
-    return rollClient;
-  }
-
-  CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
-      final ClientRegionShortcut shortcut) {
-    return new CacheSerializableRunnable("execute: createClientRegion") {
-      @Override
-      public void run2() {
-        try {
-          createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
-              shortcut);
-        } catch (Exception e) {
-          fail("Error creating client region", e);
-        }
-      }
-    };
-  }
-
-  private static void createClientRegion(GemFireCache cache, String regionName,
-      ClientRegionShortcut shortcut) {
-    ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
-    rf.create(regionName);
-  }
-
-  CacheSerializableRunnable invokeStartCacheServer(final int port) {
-    return new CacheSerializableRunnable("execute: startCacheServer") {
-      @Override
-      public void run2() {
-        try {
-          startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port);
-        } catch (Exception e) {
-          fail("Error creating cache", e);
-        }
-      }
-    };
-  }
-
-  private static void startCacheServer(GemFireCache cache, int port) throws Exception {
-    CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
-    cacheServer.setPort(port);
-    cacheServer.start();
-  }
-
-  CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties,
-      final String[] hosts, final int[] ports, boolean subscriptionEnabled) {
-    return new CacheSerializableRunnable("execute: createClientCache") {
-      @Override
-      public void run2() {
-        try {
-          LuceneSearchWithRollingUpgradeDUnit.cache =
-              createClientCache(systemProperties, hosts, ports, subscriptionEnabled);
-        } catch (Exception e) {
-          fail("Error creating client cache", e);
-        }
-      }
-    };
-  }
-
-  Properties getClientSystemProperties() {
-    Properties p = new Properties();
-    p.setProperty("mcast-port", "0");
-    return p;
-  }
-
-
-  private static ClientCache createClientCache(Properties systemProperties, String[] hosts,
-      int[] ports, boolean subscriptionEnabled) {
-    ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
-    if (subscriptionEnabled) {
-      cf.setPoolSubscriptionEnabled(true);
-      cf.setPoolSubscriptionRedundancy(-1);
-    }
-    int hostsLength = hosts.length;
-    for (int i = 0; i < hostsLength; i++) {
-      cf.addPoolLocator(hosts[i], ports[i]);
-    }
-
-    return cf.create();
-  }
+  @Parameterized.Parameter(2)
+  public Boolean singleHopEnabled;
 
   // We start an "old" locator and old servers
   // We roll the locator
@@ -303,7 +141,7 @@
           locatorString);
 
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType,
-          testingDirs[0], shortcutName, regionName, locatorPorts);
+          testingDirs[0], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5,
@@ -313,7 +151,7 @@
           20, server1, server3);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType,
-          testingDirs[1], shortcutName, regionName, locatorPorts);
+          testingDirs[1], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
@@ -323,7 +161,7 @@
           30, server2, server3);
 
       server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType,
-          testingDirs[2], shortcutName, regionName, locatorPorts);
+          testingDirs[2], shortcutName, regionName, locatorPorts, reindex);
       verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3);
       putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15,
           25, server1, server2);
@@ -340,627 +178,4 @@
     }
   }
 
-  void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
-      int expectedRegionSize, int start, int end, VM... vms) throws Exception {
-    // do puts
-    putSerializableObject(putter, regionName, start, end);
-
-    // verify present in others
-    verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
-  }
-
-  void putSerializableObject(VM putter, String regionName, int start, int end)
-      throws Exception {
-    for (int i = start; i < end; i++) {
-      Class aClass = Thread.currentThread().getContextClassLoader()
-          .loadClass("org.apache.geode.cache.query.data.Portfolio");
-      Constructor portfolioConstructor = aClass.getConstructor(int.class);
-      Object serializableObject = portfolioConstructor.newInstance(i);
-      putter.invoke(invokePut(regionName, i, serializableObject));
-    }
-  }
-
-  private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
-    await().untilAsserted(() -> {
-      Object region =
-          cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
-      int regionSize = (int) region.getClass().getMethod("size").invoke(region);
-      assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
-          regionSize);
-    });
-  }
-
-  void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
-      throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    luceneService.getClass()
-        .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
-        .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
-    Method createLuceneQueryFactoryMethod =
-        luceneService.getClass().getMethod("createLuceneQueryFactory");
-    createLuceneQueryFactoryMethod.setAccessible(true);
-    Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
-    Object luceneQuery = luceneQueryFactory.getClass()
-        .getMethod("create", String.class, String.class, String.class, String.class)
-        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
-
-    Collection resultsActive = executeLuceneQuery(luceneQuery);
-
-    luceneQuery = luceneQueryFactory.getClass()
-        .getMethod("create", String.class, String.class, String.class, String.class)
-        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
-
-    Collection resultsInactive = executeLuceneQuery(luceneQuery);
-
-    assertEquals("Result size not as expected ", expectedRegionSize,
-        resultsActive.size() + resultsInactive.size());
-  }
-
-  private Collection executeLuceneQuery(Object luceneQuery)
-      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
-    Collection results = null;
-    int retryCount = 10;
-    while (true) {
-      try {
-        results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
-        break;
-      } catch (Exception ex) {
-        if (!ex.getCause().getMessage().contains("currently indexing")) {
-          throw ex;
-        }
-        if (--retryCount == 0) {
-          throw ex;
-        }
-      }
-    }
-    return results;
-
-  }
-
-  private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
-      VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
-      vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
-    }
-
-  }
-
-  void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
-    for (VM vm : vms) {
-      vm.invoke(runnable);
-    }
-  }
-
-  // Used to close cache and make sure we attempt on all vms even if some do not have a cache
-  void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
-      VM... vms) {
-    for (VM vm : vms) {
-      try {
-        vm.invoke(runnable);
-      } catch (Exception e) {
-        if (!catchErrors) {
-          throw e;
-        }
-      }
-    }
-  }
-
-  private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
-    // Roll the server
-    oldServer.invoke(invokeCloseCache());
-    VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
-    rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
-        : getSystemPropertiesPost71(locatorPorts)));
-    rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
-    return rollServer;
-  }
-
-  VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType,
-      File diskdir, String shortcutName, String regionName, int[] locatorPorts) {
-    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
-    return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
-        rollServer);
-  }
-
-  private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir,
-      String shortcutName, String regionName, VM rollServer) {
-
-    Boolean serializeIt = reindex;
-    rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
-    rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
-    // recreate region on "rolled" server
-    if ((regionType.equals("persistentPartitioned"))) {
-      CacheSerializableRunnable runnable =
-          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
-      invokeRunnableInVMs(runnable, rollServer);
-    } else {
-      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
-    }
-    rollServer.invoke(invokeRebalance());
-    return rollServer;
-  }
-
-  VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
-      String shortcutName, String regionName, int[] locatorPorts) {
-    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
-    // recreate region on "rolled" server
-    if ((regionType.equals("persistentPartitioned"))) {
-      CacheSerializableRunnable runnable =
-          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
-      invokeRunnableInVMs(runnable, rollServer);
-    } else {
-      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
-    }
-    rollServer.invoke(invokeRebalance());
-    return rollServer;
-  }
-
-  VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
-      final String testName, final String locatorString) {
-    // Roll the locator
-    oldLocator.invoke(invokeStopLocator());
-    VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
-    final Properties props = new Properties();
-    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
-    rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
-    return rollLocator;
-  }
-
-  // Due to licensing changes
-  private Properties getSystemPropertiesPost71() {
-    Properties props = getSystemProperties();
-    return props;
-  }
-
-  // Due to licensing changes
-  private Properties getSystemPropertiesPost71(int[] locatorPorts) {
-    Properties props = getSystemProperties(locatorPorts);
-    return props;
-  }
-
-  private Properties getSystemProperties() {
-    Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
-    props.remove("disable-auto-reconnect");
-    props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
-    props.remove(DistributionConfig.LOCK_MEMORY_NAME);
-    return props;
-  }
-
-  Properties getSystemProperties(int[] locatorPorts) {
-    Properties p = new Properties();
-    String locatorString = getLocatorString(locatorPorts);
-    p.setProperty("locators", locatorString);
-    p.setProperty("mcast-port", "0");
-    return p;
-  }
-
-  static String getLocatorString(int locatorPort) {
-    String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
-    return locatorString;
-  }
-
-  static String getLocatorString(int[] locatorPorts) {
-    StringBuilder locatorString = new StringBuilder();
-    int numLocators = locatorPorts.length;
-    for (int i = 0; i < numLocators; i++) {
-      locatorString.append(getLocatorString(locatorPorts[i]));
-      if (i + 1 < numLocators) {
-        locatorString.append(",");
-      }
-    }
-    return locatorString.toString();
-  }
-
-  private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
-      final String testName, final String locatorsString, final Properties props) {
-    return new CacheSerializableRunnable("execute: startLocator") {
-      @Override
-      public void run2() {
-        try {
-          startLocator(serverHostName, port, testName, locatorsString, props);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
-      final Properties props) {
-    return new CacheSerializableRunnable("execute: startLocator") {
-      @Override
-      public void run2() {
-        try {
-          startLocator(serverHostName, port, props);
-        } catch (Exception e) {
-          fail("Error starting locators", e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
-    return new CacheSerializableRunnable("execute: createCache") {
-      @Override
-      public void run2() {
-        try {
-          LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeAssertVersion(final short version) {
-    return new CacheSerializableRunnable("execute: assertVersion") {
-      @Override
-      public void run2() {
-        try {
-          assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCreateRegion(final String regionName,
-      final String shortcutName) {
-    return new CacheSerializableRunnable("execute: createRegion") {
-      @Override
-      public void run2() {
-        try {
-          createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName,
-      final File diskstore) {
-    return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
-      @Override
-      public void run2() {
-        try {
-          createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
-              diskstore);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokePut(final String regionName, final Object key,
-      final Object value) {
-    return new CacheSerializableRunnable("execute: put") {
-      @Override
-      public void run2() {
-        try {
-          put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeStopLocator() {
-    return new CacheSerializableRunnable("execute: stopLocator") {
-      @Override
-      public void run2() {
-        try {
-          stopLocator();
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  CacheSerializableRunnable invokeCloseCache() {
-    return new CacheSerializableRunnable("execute: closeCache") {
-      @Override
-      public void run2() {
-        try {
-          closeCache(LuceneSearchWithRollingUpgradeDUnit.cache);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private CacheSerializableRunnable invokeRebalance() {
-    return new CacheSerializableRunnable("execute: rebalance") {
-      @Override
-      public void run2() {
-        try {
-          rebalance(LuceneSearchWithRollingUpgradeDUnit.cache);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-  }
-
-  private void deleteDiskStores() {
-    try {
-      FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
-    } catch (IOException e) {
-      throw new Error("Error deleting files", e);
-    }
-  }
-
-  private static Object createCache(Properties systemProperties) throws Exception {
-
-    Class distConfigClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
-    boolean disableConfig = true;
-    try {
-      distConfigClass.getDeclaredField("useSharedConfiguration");
-    } catch (NoSuchFieldException e) {
-      disableConfig = false;
-    }
-    if (disableConfig) {
-      systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
-    }
-
-    Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.CacheFactory");
-    Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
-    constructor.setAccessible(true);
-    Object cacheFactory = constructor.newInstance(systemProperties);
-
-    Method createMethod = cacheFactoryClass.getMethod("create");
-    createMethod.setAccessible(true);
-    Object cache = createMethod.invoke(cacheFactory);
-    return cache;
-  }
-
-  private static Object getRegion(Object cache, String regionName) throws Exception {
-    return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
-  }
-
-  private static Object put(Object cache, String regionName, Object key, Object value)
-      throws Exception {
-    Object region = getRegion(cache, regionName);
-    return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
-        value);
-  }
-
-  private static void createRegion(Object cache, String regionName, String shortcutName)
-      throws Exception {
-    Class aClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.RegionShortcut");
-    Object[] enumConstants = aClass.getEnumConstants();
-    Object shortcut = null;
-    int length = enumConstants.length;
-    for (int i = 0; i < length; i++) {
-      Object constant = enumConstants[i];
-      if (((Enum) constant).name().equals(shortcutName)) {
-        shortcut = constant;
-        break;
-      }
-    }
-
-    Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
-    createRegionFactoryMethod.setAccessible(true);
-    Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
-    Method createMethod = regionFactory.getClass().getMethod("create", String.class);
-    createMethod.setAccessible(true);
-    createMethod.invoke(regionFactory, regionName);
-  }
-
-  static void createLuceneIndex(Object cache, String regionName, String indexName)
-      throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    Method createLuceneIndexFactoryMethod =
-        luceneService.getClass().getMethod("createIndexFactory");
-    createLuceneIndexFactoryMethod.setAccessible(true);
-    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
-    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
-        "status");
-    luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
-        .invoke(luceneIndexFactory, indexName, regionName);
-  }
-
-  static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
-      String indexName) throws Exception {
-    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
-    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
-    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
-    Method createLuceneIndexFactoryMethod =
-        luceneService.getClass().getMethod("createIndexFactory");
-    createLuceneIndexFactoryMethod.setAccessible(true);
-    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
-    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
-        "status");
-    luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
-        .invoke(luceneIndexFactory, indexName, regionName, true);
-  }
-
-  private static void createPersistentPartitonedRegion(Object cache, String regionName,
-      File diskStore) throws Exception {
-    Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
-    Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.cache.DataPolicy");
-    Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
-    if (store == null) {
-      Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
-      dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
-      dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
-          new Object[] {new File[] {diskStore.getAbsoluteFile()}});
-      dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
-    }
-    Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
-    rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
-    rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
-    rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
-  }
-
-  private static void assertVersion(Object cache, short ordinal) throws Exception {
-    Class idmClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
-    Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
-    getDSMethod.setAccessible(true);
-    Object ds = getDSMethod.invoke(cache);
-
-    Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
-    getDistributedMemberMethod.setAccessible(true);
-    Object member = getDistributedMemberMethod.invoke(ds);
-    Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
-    getVersionObjectMethod.setAccessible(true);
-    Object thisVersion = getVersionObjectMethod.invoke(member);
-    Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
-    getOrdinalMethod.setAccessible(true);
-    short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
-    if (ordinal != thisOrdinal) {
-      throw new Error(
-          "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
-    }
-  }
-
-  private static void stopCacheServers(Object cache) throws Exception {
-    Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
-    getCacheServersMethod.setAccessible(true);
-    List cacheServers = (List) getCacheServersMethod.invoke(cache);
-    Method stopMethod = null;
-    for (Object cs : cacheServers) {
-      if (stopMethod == null) {
-        stopMethod = cs.getClass().getMethod("stop");
-      }
-      stopMethod.setAccessible(true);
-      stopMethod.invoke(cs);
-    }
-  }
-
-  private static void closeCache(Object cache) throws Exception {
-    if (cache == null) {
-      return;
-    }
-    Method isClosedMethod = cache.getClass().getMethod("isClosed");
-    isClosedMethod.setAccessible(true);
-    boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
-    if (cache != null && !cacheClosed) {
-      stopCacheServers(cache);
-      Method method = cache.getClass().getMethod("close");
-      method.setAccessible(true);
-      method.invoke(cache);
-      long startTime = System.currentTimeMillis();
-      while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
-        try {
-          Thread.sleep(1000);
-          Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
-          cacheClosedMethod.setAccessible(true);
-          cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    }
-  }
-
-  private static void rebalance(Object cache) throws Exception {
-    Method getRMMethod = cache.getClass().getMethod("getResourceManager");
-    getRMMethod.setAccessible(true);
-    Object manager = getRMMethod.invoke(cache);
-
-    Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
-    createRebalanceFactoryMethod.setAccessible(true);
-    Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
-    Method m = rebalanceFactory.getClass().getMethod("start");
-    m.setAccessible(true);
-    Object op = m.invoke(rebalanceFactory);
-
-    // Wait until the rebalance is complete
-    try {
-      Method getResultsMethod = op.getClass().getMethod("getResults");
-      getResultsMethod.setAccessible(true);
-      Object results = getResultsMethod.invoke(op);
-      Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
-      getTotalTimeMethod.setAccessible(true);
-      System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
-      Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
-      getTotalBucketsMethod.setAccessible(true);
-      System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
-    } catch (Exception e) {
-      Thread.currentThread().interrupt();
-      throw e;
-    }
-  }
-
-  /**
-   * Starts a locator with given configuration.
-   */
-  private static void startLocator(final String serverHostName, final int port,
-      final String testName, final String locatorsString, final Properties props) throws Exception {
-    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
-    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
-    Logger logger = LogService.getLogger();
-    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
-
-    InetAddress bindAddr;
-    try {
-      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
-    } catch (UnknownHostException uhe) {
-      throw new Error("While resolving bind address ", uhe);
-    }
-
-    File logFile = new File(testName + "-locator" + port + ".log");
-    Class locatorClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.Locator");
-    Method startLocatorAndDSMethod =
-        locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
-            Properties.class, boolean.class, boolean.class, String.class);
-    startLocatorAndDSMethod.setAccessible(true);
-    startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
-  }
-
-  private static void startLocator(final String serverHostName, final int port, Properties props)
-      throws Exception {
-
-
-    InetAddress bindAddr = null;
-    try {
-      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
-    } catch (UnknownHostException uhe) {
-      throw new Error("While resolving bind address ", uhe);
-    }
-
-    Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
-    Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
-  }
-
-  private static void stopLocator() throws Exception {
-    Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
-        .loadClass("org.apache.geode.distributed.internal.InternalLocator");
-    Method locatorMethod = internalLocatorClass.getMethod("getLocator");
-    locatorMethod.setAccessible(true);
-    Object locator = locatorMethod.invoke(null);
-    Method stopLocatorMethod = locator.getClass().getMethod("stop");
-    stopLocatorMethod.setAccessible(true);
-    stopLocatorMethod.invoke(locator);
-  }
-
-  /**
-   * Get the port that the standard dunit locator is listening on.
-   *
-   * @return locator address
-   */
-  private static String getDUnitLocatorAddress() {
-    return Host.getHost(0).getHostName();
-  }
-
 }
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
new file mode 100644
index 0000000..b08d875
--- /dev/null
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.GemFireCache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.ClientMetadataService;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.DUnitLauncher;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.version.VersionManager;
+
+public abstract class LuceneSearchWithRollingUpgradeTestBase extends JUnit4DistributedTestCase {
+
+  protected File[] testingDirs = new File[3];
+
+  protected static String INDEX_NAME = "index";
+
+  protected static String diskDir = "LuceneSearchWithRollingUpgradeTestBase";
+
+  // Each vm will have a cache object
+  protected static Object cache;
+
+  protected void deleteVMFiles() {
+    System.out.println("deleting files in vm" + VM.getCurrentVMNum());
+    File pwd = new File(".");
+    for (File entry : pwd.listFiles()) {
+      try {
+        if (entry.isDirectory()) {
+          FileUtils.deleteDirectory(entry);
+        } else {
+          entry.delete();
+        }
+      } catch (Exception e) {
+        System.out.println("Could not delete " + entry + ": " + e.getMessage());
+      }
+    }
+  }
+
+  protected void deleteWorkingDirFiles() {
+    Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
+  }
+
+  @Override
+  public void postSetUp() {
+    deleteWorkingDirFiles();
+    IgnoredException.addIgnoredException(
+        "cluster configuration service not available|ConflictingPersistentDataException");
+  }
+
+
+  Properties getLocatorPropertiesPre91(String locatorsString) {
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
+    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
+    return props;
+  }
+
+  VM rollClientToCurrentAndCreateRegion(VM oldClient,
+      ClientRegionShortcut shortcut,
+      String regionName, String[] hostNames, int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
+    VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled,
+        singleHopEnabled);
+    // recreate region on "rolled" client
+    invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
+    return rollClient;
+  }
+
+  protected VM rollClientToCurrent(VM oldClient, String[] hostNames,
+      int[] locatorPorts,
+      boolean subscriptionEnabled, boolean singleHopEnabled) {
+    oldClient.invoke(invokeCloseCache());
+    VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
+    rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
+        subscriptionEnabled, singleHopEnabled));
+    rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+    return rollClient;
+  }
+
+  CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
+      final ClientRegionShortcut shortcut) {
+    return new CacheSerializableRunnable("execute: createClientRegion") {
+      @Override
+      public void run2() {
+        try {
+          createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache,
+              regionName,
+              shortcut);
+        } catch (Exception e) {
+          fail("Error creating client region", e);
+        }
+      }
+    };
+  }
+
+  protected static void createClientRegion(GemFireCache cache, String regionName,
+      ClientRegionShortcut shortcut) {
+    ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
+    rf.create(regionName);
+  }
+
+  CacheSerializableRunnable invokeStartCacheServer(final int port) {
+    return new CacheSerializableRunnable("execute: startCacheServer") {
+      @Override
+      public void run2() {
+        try {
+          startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, port);
+        } catch (Exception e) {
+          fail("Error creating cache", e);
+        }
+      }
+    };
+  }
+
+  protected static void startCacheServer(GemFireCache cache, int port) throws Exception {
+    CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
+    cacheServer.setPort(port);
+    cacheServer.start();
+  }
+
+  CacheSerializableRunnable invokeCreateClientCache(
+      final Properties systemProperties,
+      final String[] hosts, final int[] ports, boolean subscriptionEnabled,
+      boolean singleHopEnabled) {
+    return new CacheSerializableRunnable("execute: createClientCache") {
+      @Override
+      public void run2() {
+        try {
+          LuceneSearchWithRollingUpgradeTestBase.cache =
+              createClientCache(systemProperties, hosts, ports, subscriptionEnabled,
+                  singleHopEnabled);
+        } catch (Exception e) {
+          fail("Error creating client cache", e);
+        }
+      }
+    };
+  }
+
+  Properties getClientSystemProperties() {
+    Properties p = new Properties();
+    p.setProperty("mcast-port", "0");
+    return p;
+  }
+
+
+  protected static ClientCache createClientCache(Properties systemProperties,
+      String[] hosts,
+      int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) {
+    ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
+    if (subscriptionEnabled) {
+      cf.setPoolSubscriptionEnabled(true);
+      cf.setPoolSubscriptionRedundancy(-1);
+    }
+    cf.setPoolPRSingleHopEnabled(singleHopEnabled);
+    int hostsLength = hosts.length;
+    for (int i = 0; i < hostsLength; i++) {
+      cf.addPoolLocator(hosts[i], ports[i]);
+    }
+
+    return cf.create();
+  }
+
+
+  void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
+      int expectedRegionSize, int start, int end, VM... vms) throws Exception {
+    // do puts
+    putSerializableObject(putter, regionName, start, end);
+
+    // verify present in others
+    verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
+  }
+
+  void putSerializableObject(VM putter, String regionName, int start, int end)
+      throws Exception {
+    for (int i = start; i < end; i++) {
+      Class aClass = Thread.currentThread().getContextClassLoader()
+          .loadClass("org.apache.geode.cache.query.data.Portfolio");
+      Constructor portfolioConstructor = aClass.getConstructor(int.class);
+      Object serializableObject = portfolioConstructor.newInstance(i);
+      putter.invoke(invokePut(regionName, i, serializableObject));
+    }
+  }
+
+  private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
+    await().untilAsserted(() -> {
+      Object region =
+          cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
+      int regionSize = (int) region.getClass().getMethod("size").invoke(region);
+      assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
+          regionSize);
+    });
+  }
+
+  void updateClientSingleHopMetadata(String regionName) {
+    ClientMetadataService cms = ((InternalCache) cache)
+        .getClientMetadataService();
+    cms.scheduleGetPRMetaData(
+        (LocalRegion) ((InternalCache) cache).getRegion(regionName), true);
+    GeodeAwaitility.await("Awaiting ClientMetadataService.isMetadataStable()")
+        .untilAsserted(() -> assertThat(cms.isMetadataStable()).isTrue());
+  }
+
+  void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
+      throws Exception {
+    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+    luceneService.getClass()
+        .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
+        .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
+    Method createLuceneQueryFactoryMethod =
+        luceneService.getClass().getMethod("createLuceneQueryFactory");
+    createLuceneQueryFactoryMethod.setAccessible(true);
+    Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
+    Object luceneQuery = luceneQueryFactory.getClass()
+        .getMethod("create", String.class, String.class, String.class, String.class)
+        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
+
+    Collection resultsActive = executeLuceneQuery(luceneQuery);
+
+    luceneQuery = luceneQueryFactory.getClass()
+        .getMethod("create", String.class, String.class, String.class, String.class)
+        .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
+
+    Collection resultsInactive = executeLuceneQuery(luceneQuery);
+
+    assertEquals("Result size not as expected ", expectedRegionSize,
+        resultsActive.size() + resultsInactive.size());
+  }
+
+  protected Collection executeLuceneQuery(Object luceneQuery)
+      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+    Collection results = null;
+    int retryCount = 10;
+    while (true) {
+      try {
+        results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+        break;
+      } catch (Exception ex) {
+        if (!ex.getCause().getMessage().contains("currently indexing")) {
+          throw ex;
+        }
+        if (--retryCount == 0) {
+          throw ex;
+        }
+      }
+    }
+    return results;
+
+  }
+
+  protected void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
+      VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
+      vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
+    }
+
+  }
+
+  void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
+    for (VM vm : vms) {
+      vm.invoke(runnable);
+    }
+  }
+
+  // Used to close cache and make sure we attempt on all vms even if some do not have a cache
+  void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
+      VM... vms) {
+    for (VM vm : vms) {
+      try {
+        vm.invoke(runnable);
+      } catch (Exception e) {
+        if (!catchErrors) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
+    // Roll the server
+    oldServer.invoke(invokeCloseCache());
+    VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
+    rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
+        : getSystemPropertiesPost71(locatorPorts)));
+    rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+    return rollServer;
+  }
+
+  VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer,
+      String regionType,
+      File diskdir, String shortcutName, String regionName, int[] locatorPorts,
+      boolean reindex) {
+    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
+    return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
+        rollServer, reindex);
+  }
+
+  private VM createLuceneIndexAndRegionOnRolledServer(String regionType,
+      File diskdir,
+      String shortcutName, String regionName, VM rollServer, boolean reindex) {
+
+    Boolean serializeIt = reindex;
+    rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
+    rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+    // recreate region on "rolled" server
+    if ((regionType.equals("persistentPartitioned"))) {
+      CacheSerializableRunnable runnable =
+          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
+      invokeRunnableInVMs(runnable, rollServer);
+    } else {
+      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
+    }
+    rollServer.invoke(invokeRebalance());
+    return rollServer;
+  }
+
+  VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
+      String shortcutName, String regionName, int[] locatorPorts) {
+    VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
+    // recreate region on "rolled" server
+    if ((regionType.equals("persistentPartitioned"))) {
+      CacheSerializableRunnable runnable =
+          invokeCreatePersistentPartitionedRegion(regionName, diskdir);
+      invokeRunnableInVMs(runnable, rollServer);
+    } else {
+      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
+    }
+    rollServer.invoke(invokeRebalance());
+    return rollServer;
+  }
+
+  VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
+      final String testName, final String locatorString) {
+    // Roll the locator
+    oldLocator.invoke(invokeStopLocator());
+    VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
+    final Properties props = new Properties();
+    props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+    rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
+    return rollLocator;
+  }
+
+  // Due to licensing changes
+  private Properties getSystemPropertiesPost71() {
+    Properties props = getSystemProperties();
+    return props;
+  }
+
+  // Due to licensing changes
+  private Properties getSystemPropertiesPost71(int[] locatorPorts) {
+    Properties props = getSystemProperties(locatorPorts);
+    return props;
+  }
+
+  private Properties getSystemProperties() {
+    Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
+    props.remove("disable-auto-reconnect");
+    props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
+    props.remove(DistributionConfig.LOCK_MEMORY_NAME);
+    return props;
+  }
+
+  Properties getSystemProperties(int[] locatorPorts) {
+    Properties p = new Properties();
+    String locatorString = getLocatorString(locatorPorts);
+    p.setProperty("locators", locatorString);
+    p.setProperty("mcast-port", "0");
+    return p;
+  }
+
+  static String getLocatorString(int locatorPort) {
+    String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
+    return locatorString;
+  }
+
+  static String getLocatorString(int[] locatorPorts) {
+    StringBuilder locatorString = new StringBuilder();
+    int numLocators = locatorPorts.length;
+    for (int i = 0; i < numLocators; i++) {
+      locatorString.append(getLocatorString(locatorPorts[i]));
+      if (i + 1 < numLocators) {
+        locatorString.append(",");
+      }
+    }
+    return locatorString.toString();
+  }
+
+  protected CacheSerializableRunnable invokeStartLocator(final String serverHostName,
+      final int port,
+      final String testName, final String locatorsString, final Properties props) {
+    return new CacheSerializableRunnable("execute: startLocator") {
+      @Override
+      public void run2() {
+        try {
+          startLocator(serverHostName, port, testName, locatorsString, props);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
+      final Properties props) {
+    return new CacheSerializableRunnable("execute: startLocator") {
+      @Override
+      public void run2() {
+        try {
+          startLocator(serverHostName, port, props);
+        } catch (Exception e) {
+          fail("Error starting locators", e);
+        }
+      }
+    };
+  }
+
+  CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
+    return new CacheSerializableRunnable("execute: createCache") {
+      @Override
+      public void run2() {
+        try {
+          LuceneSearchWithRollingUpgradeTestBase.cache = createCache(systemProperties);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private CacheSerializableRunnable invokeAssertVersion(final short version) {
+    return new CacheSerializableRunnable("execute: assertVersion") {
+      @Override
+      public void run2() {
+        try {
+          assertVersion(LuceneSearchWithRollingUpgradeTestBase.cache, version);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  CacheSerializableRunnable invokeCreateRegion(final String regionName,
+      final String shortcutName) {
+    return new CacheSerializableRunnable("execute: createRegion") {
+      @Override
+      public void run2() {
+        try {
+          createRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, shortcutName);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  protected CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(
+      final String regionName,
+      final File diskstore) {
+    return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
+      @Override
+      public void run2() {
+        try {
+          createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName,
+              diskstore);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  protected CacheSerializableRunnable invokePut(final String regionName, final Object key,
+      final Object value) {
+    return new CacheSerializableRunnable("execute: put") {
+      @Override
+      public void run2() {
+        try {
+          put(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, key, value);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  CacheSerializableRunnable invokeStopLocator() {
+    return new CacheSerializableRunnable("execute: stopLocator") {
+      @Override
+      public void run2() {
+        try {
+          stopLocator();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  CacheSerializableRunnable invokeCloseCache() {
+    return new CacheSerializableRunnable("execute: closeCache") {
+      @Override
+      public void run2() {
+        try {
+          closeCache(LuceneSearchWithRollingUpgradeTestBase.cache);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  protected CacheSerializableRunnable invokeRebalance() {
+    return new CacheSerializableRunnable("execute: rebalance") {
+      @Override
+      public void run2() {
+        try {
+          rebalance(LuceneSearchWithRollingUpgradeTestBase.cache);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  protected void deleteDiskStores() {
+    try {
+      FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
+    } catch (IOException e) {
+      throw new Error("Error deleting files", e);
+    }
+  }
+
+  protected static Object createCache(Properties systemProperties) throws Exception {
+
+    Class distConfigClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
+    boolean disableConfig = true;
+    try {
+      distConfigClass.getDeclaredField("useSharedConfiguration");
+    } catch (NoSuchFieldException e) {
+      disableConfig = false;
+    }
+    if (disableConfig) {
+      systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+    }
+
+    Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.CacheFactory");
+    Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
+    constructor.setAccessible(true);
+    Object cacheFactory = constructor.newInstance(systemProperties);
+
+    Method createMethod = cacheFactoryClass.getMethod("create");
+    createMethod.setAccessible(true);
+    Object cache = createMethod.invoke(cacheFactory);
+    return cache;
+  }
+
+  protected static Object getRegion(Object cache, String regionName) throws Exception {
+    return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
+  }
+
+  protected static Object put(Object cache, String regionName, Object key, Object value)
+      throws Exception {
+    Object region = getRegion(cache, regionName);
+    return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
+        value);
+  }
+
+  protected static void createRegion(Object cache, String regionName, String shortcutName)
+      throws Exception {
+    Class aClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.RegionShortcut");
+    Object[] enumConstants = aClass.getEnumConstants();
+    Object shortcut = null;
+    int length = enumConstants.length;
+    for (int i = 0; i < length; i++) {
+      Object constant = enumConstants[i];
+      if (((Enum) constant).name().equals(shortcutName)) {
+        shortcut = constant;
+        break;
+      }
+    }
+
+    Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
+    createRegionFactoryMethod.setAccessible(true);
+    Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
+    Method createMethod = regionFactory.getClass().getMethod("create", String.class);
+    createMethod.setAccessible(true);
+    createMethod.invoke(regionFactory, regionName);
+  }
+
+  static void createLuceneIndex(Object cache, String regionName, String indexName)
+      throws Exception {
+    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+    Method createLuceneIndexFactoryMethod =
+        luceneService.getClass().getMethod("createIndexFactory");
+    createLuceneIndexFactoryMethod.setAccessible(true);
+    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
+    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
+        "status");
+    luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
+        .invoke(luceneIndexFactory, indexName, regionName);
+  }
+
+  static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
+      String indexName) throws Exception {
+    Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+    Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+    Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+    Method createLuceneIndexFactoryMethod =
+        luceneService.getClass().getMethod("createIndexFactory");
+    createLuceneIndexFactoryMethod.setAccessible(true);
+    Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
+    luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
+        "status");
+    luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
+        .invoke(luceneIndexFactory, indexName, regionName, true);
+  }
+
+  protected static void createPersistentPartitonedRegion(Object cache, String regionName,
+      File diskStore) throws Exception {
+    Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
+    Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.cache.DataPolicy");
+    Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
+    if (store == null) {
+      Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
+      dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
+      dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
+          new Object[] {new File[] {diskStore.getAbsoluteFile()}});
+      dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
+    }
+    Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
+    rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
+    rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
+    rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
+  }
+
+  protected static void assertVersion(Object cache, short ordinal) throws Exception {
+    Class idmClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
+    Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
+    getDSMethod.setAccessible(true);
+    Object ds = getDSMethod.invoke(cache);
+
+    Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
+    getDistributedMemberMethod.setAccessible(true);
+    Object member = getDistributedMemberMethod.invoke(ds);
+    Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
+    getVersionObjectMethod.setAccessible(true);
+    Object thisVersion = getVersionObjectMethod.invoke(member);
+    Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
+    getOrdinalMethod.setAccessible(true);
+    short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
+    if (ordinal != thisOrdinal) {
+      throw new Error(
+          "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
+    }
+  }
+
+  protected static void stopCacheServers(Object cache) throws Exception {
+    Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
+    getCacheServersMethod.setAccessible(true);
+    List cacheServers = (List) getCacheServersMethod.invoke(cache);
+    Method stopMethod = null;
+    for (Object cs : cacheServers) {
+      if (stopMethod == null) {
+        stopMethod = cs.getClass().getMethod("stop");
+      }
+      stopMethod.setAccessible(true);
+      stopMethod.invoke(cs);
+    }
+  }
+
+  protected static void closeCache(Object cache) throws Exception {
+    if (cache == null) {
+      return;
+    }
+    Method isClosedMethod = cache.getClass().getMethod("isClosed");
+    isClosedMethod.setAccessible(true);
+    boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
+    if (cache != null && !cacheClosed) {
+      stopCacheServers(cache);
+      Method method = cache.getClass().getMethod("close");
+      method.setAccessible(true);
+      method.invoke(cache);
+      long startTime = System.currentTimeMillis();
+      while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
+        try {
+          Thread.sleep(1000);
+          Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
+          cacheClosedMethod.setAccessible(true);
+          cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  protected static void rebalance(Object cache) throws Exception {
+    Method getRMMethod = cache.getClass().getMethod("getResourceManager");
+    getRMMethod.setAccessible(true);
+    Object manager = getRMMethod.invoke(cache);
+
+    Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
+    createRebalanceFactoryMethod.setAccessible(true);
+    Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
+    Method m = rebalanceFactory.getClass().getMethod("start");
+    m.setAccessible(true);
+    Object op = m.invoke(rebalanceFactory);
+
+    // Wait until the rebalance is complete
+    try {
+      Method getResultsMethod = op.getClass().getMethod("getResults");
+      getResultsMethod.setAccessible(true);
+      Object results = getResultsMethod.invoke(op);
+      Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
+      getTotalTimeMethod.setAccessible(true);
+      System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
+      Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
+      getTotalBucketsMethod.setAccessible(true);
+      System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
+    } catch (Exception e) {
+      Thread.currentThread().interrupt();
+      throw e;
+    }
+  }
+
+  /**
+   * Starts a locator with given configuration.
+   */
+  protected static void startLocator(final String serverHostName, final int port,
+      final String testName, final String locatorsString, final Properties props) throws Exception {
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
+    Logger logger = LogService.getLogger();
+    props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
+
+    InetAddress bindAddr;
+    try {
+      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
+    } catch (UnknownHostException uhe) {
+      throw new Error("While resolving bind address ", uhe);
+    }
+
+    File logFile = new File(testName + "-locator" + port + ".log");
+    Class locatorClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.distributed.Locator");
+    Method startLocatorAndDSMethod =
+        locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
+            Properties.class, boolean.class, boolean.class, String.class);
+    startLocatorAndDSMethod.setAccessible(true);
+    startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
+  }
+
+  protected static void startLocator(final String serverHostName, final int port, Properties props)
+      throws Exception {
+
+
+    InetAddress bindAddr = null;
+    try {
+      bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
+    } catch (UnknownHostException uhe) {
+      throw new Error("While resolving bind address ", uhe);
+    }
+
+    Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
+    Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
+  }
+
+  protected static void stopLocator() throws Exception {
+    Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
+        .loadClass("org.apache.geode.distributed.internal.InternalLocator");
+    Method locatorMethod = internalLocatorClass.getMethod("getLocator");
+    locatorMethod.setAccessible(true);
+    Object locator = locatorMethod.invoke(null);
+    Method stopLocatorMethod = locator.getClass().getMethod("stop");
+    stopLocatorMethod.setAccessible(true);
+    stopLocatorMethod.invoke(locator);
+  }
+
+  /**
+   * Get the port that the standard dunit locator is listening on.
+   *
+   * @return locator address
+   */
+  protected static String getDUnitLocatorAddress() {
+    return Host.getHost(0).getHostName();
+  }
+
+  protected Object executeDummyFunction(String regionName) {
+    Region region = ((InternalCache) cache).getRegion(regionName);
+    Object result = FunctionService.onRegion(region).execute("TestFunction").getResult();
+    ArrayList<Boolean> list = (ArrayList) result;
+    assertThat(list.get(0)).isTrue();
+    return result;
+  }
+
+  static class TestFunction implements Function, Declarable {
+
+    public void execute(FunctionContext context) {
+      context.getResultSender().lastResult(true);
+    }
+
+    public String getId() {
+      return getClass().getSimpleName();
+    }
+
+    public boolean optimizeForWrite() {
+      return true;
+    }
+  }
+
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
index cb68055..8178ea6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
@@ -84,7 +84,7 @@
           locatorString);
 
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
           25, server2);
@@ -97,7 +97,7 @@
           30, server1);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       expectedRegionSize += 5;
       putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 25,
           35, server1, server2);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
new file mode 100644
index 0000000..b2e3f94
--- /dev/null
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.VersionManager;
+
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion
+    extends LuceneSearchWithRollingUpgradeTestBase {
+
+  @Parameterized.Parameter()
+  public Boolean reindex;
+
+  @Parameterized.Parameter(1)
+  public Boolean singleHopEnabled;
+
+  @Parameterized.Parameters(name = "currentVersion, reindex={0}, singleHopEnabled={1}")
+  public static Collection<Object[]> data() {
+    Collection<Object[]> rval = new ArrayList<>();
+    rval.add(new Object[] {true, true});
+    rval.add(new Object[] {true, false});
+    rval.add(new Object[] {false, true});
+    rval.add(new Object[] {false, false});
+    return rval;
+  }
+
+  @Test
+  public void functionsFailOverWhenRestartOneServer()
+      throws Exception {
+    // Since the changes relating to GEODE-7258 is not applied on 1.10.0,
+    // use this test to roll from develop to develop to verify.
+    final Host host = Host.getHost(0);
+    VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0);
+    VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+    VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+    VM client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+    final String regionName = "aRegion";
+    String regionType = "partitionedRedundant";
+    RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+
+    int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+    int[] locatorPorts = new int[] {ports[0]};
+    int[] csPorts = new int[] {ports[1], ports[2]};
+
+    locator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(locatorPorts));
+
+    String hostName = NetworkUtils.getServerHostName(host);
+    String[] hostNames = new String[] {hostName};
+    String locatorString = getLocatorString(locatorPorts);
+
+    try {
+      // Start locator, servers and client in old version
+      locator.invoke(
+          invokeStartLocator(hostName, locatorPorts[0], getLocatorPropertiesPre91(locatorString)));
+
+      // Locators before 1.4 handled configuration asynchronously.
+      // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+      locator.invoke(
+          () -> await()
+              .untilAsserted(() -> assertTrue(
+                  !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+                      || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+      invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)), server1, server2);
+      invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
+      invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
+      invokeRunnableInVMs(
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
+          client);
+
+      // Create the index on the servers
+      server1.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+      server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+
+      // Create the region on the servers and client
+      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()), server1, server2);
+      invokeRunnableInVMs(invokeCreateClientRegion(regionName, ClientRegionShortcut.PROXY), client);
+
+      // Put objects on the client so that each bucket is created
+      int numObjects = 113;
+      putSerializableObject(client, regionName, 0, numObjects);
+
+      // Execute a query on the client and verify the results. This also waits until flushed.
+      client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+
+      // Roll the locator and server 1 to current version
+      locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
+          locatorString);
+      server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
+          shortcut.name(), regionName, locatorPorts, reindex);
+
+      // Execute a query on the client and verify the results. This also waits until flushed.
+      client.invoke(() -> {
+        updateClientSingleHopMetadata(regionName);
+        verifyLuceneQueryResults(regionName, numObjects);
+      });
+
+      // Put some objects on the client. This will update the document to the latest lucene version
+      putSerializableObject(client, regionName, 0, numObjects);
+
+      // Execute a query on the client and verify the results. This also waits until flushed.
+      client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+
+      // Close server 1 cache. This will force server 2 (old version) to become primary
+      invokeRunnableInVMs(true, invokeCloseCache(), server1);
+
+      // Execute a query on the client and verify the results
+      client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+    } finally {
+      invokeRunnableInVMs(true, invokeStopLocator(), locator);
+      invokeRunnableInVMs(true, invokeCloseCache(), client, server2);
+    }
+  }
+
+  @Test
+  public void stopOneServerThenRerunFunction()
+      throws Throwable {
+    // Since the changes relating to GEODE-7258 is not applied on 1.10.0,
+    // use this test to roll from develop to develop to verify.
+    final Host host = Host.getHost(0);
+    VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0);
+    VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+    VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+    VM client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+    final String regionName = "aRegion";
+    String regionType = "partitionedRedundant";
+    RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+
+    int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+    int[] locatorPorts = new int[] {ports[0]};
+    int[] csPorts = new int[] {ports[1], ports[2]};
+
+    locator
+        .invoke(() -> DistributedTestUtils.deleteLocatorStateFile(locatorPorts));
+
+    String hostName = NetworkUtils.getServerHostName(host);
+    String[] hostNames = new String[] {hostName};
+    String locatorString = getLocatorString(locatorPorts);
+
+    try {
+      // Start locator, servers and client in old version
+      locator.invoke(
+          invokeStartLocator(hostName, locatorPorts[0],
+              getLocatorPropertiesPre91(locatorString)));
+
+      // Locators before 1.4 handled configuration asynchronously.
+      // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+      locator.invoke(
+          () -> await()
+              .untilAsserted(() -> assertTrue(
+                  !InternalLocator.getLocator().getConfig()
+                      .getEnableClusterConfiguration()
+                      || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+      invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)),
+          server1, server2);
+      invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
+      invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
+      invokeRunnableInVMs(
+          invokeCreateClientCache(getClientSystemProperties(), hostNames,
+              locatorPorts, false,
+              singleHopEnabled),
+          client);
+
+      // Create the region on the servers and client
+      invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()),
+          server1, server2);
+      invokeRunnableInVMs(
+          invokeCreateClientRegion(regionName, ClientRegionShortcut.PROXY),
+          client);
+      server1
+          .invoke(() -> FunctionService.registerFunction(new TestFunction()));
+      server2
+          .invoke(() -> FunctionService.registerFunction(new TestFunction()));
+
+      // Put objects on the client so that each bucket is created
+      int numObjects = 113;
+      putSerializableObject(client, regionName, 0, numObjects);
+
+      client.invoke(
+          () -> await().untilAsserted(() -> {
+            ArrayList<Boolean> result = (ArrayList<Boolean>) executeDummyFunction(
+                regionName);
+            assertTrue(result.size() == 2);
+          }));
+
+      server1.invoke(invokeCloseCache());
+
+      client.invoke(() -> {
+        Object result = executeDummyFunction(regionName);
+        ArrayList<Boolean> list = (ArrayList) result;
+        assertThat(list.get(0)).isTrue();
+      });
+    } finally {
+      invokeRunnableInVMs(true, invokeStopLocator(), locator);
+      invokeRunnableInVMs(true, invokeCloseCache(), client, server2);
+    }
+  }
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
index 95c0498..df1e329 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
@@ -70,7 +70,8 @@
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
 
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
       server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
       server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
@@ -87,7 +88,7 @@
           locatorString);
 
       server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 20,
@@ -97,7 +98,7 @@
           40, server2);
 
       server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 40,
@@ -107,7 +108,7 @@
           60, server3);
 
       client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName,
-          hostNames, locatorPorts, false);
+          hostNames, locatorPorts, false, singleHopEnabled);
       expectedRegionSize += 10;
       putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60,
           70, server2, server3);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
index 3acba6c..430e8fa 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
@@ -17,7 +17,6 @@
 import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
 import static org.junit.Assert.assertTrue;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.geode.cache.RegionShortcut;
@@ -32,7 +31,6 @@
 public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated
     extends LuceneSearchWithRollingUpgradeDUnit {
 
-  @Ignore("Disabled until GEODE-7258 is fixed")
   @Test
   public void test()
       throws Exception {
@@ -85,7 +83,8 @@
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
       invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
       invokeRunnableInVMs(
-          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+          invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+              singleHopEnabled),
           client);
 
       // Create the index on the servers
@@ -107,7 +106,7 @@
       locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
           locatorString);
       server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
-          shortcut.name(), regionName, locatorPorts);
+          shortcut.name(), regionName, locatorPorts, reindex);
 
       // Execute a query on the client and verify the results. This also waits until flushed.
       client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));