Feature/geode 7258 2 (#4250)
GEODE-7258: The function retry logic is modified to handle exception
thrown, while trying to connect to a server thats shutdown/closed.
Co-authored-by: Anil <agingade@pivotal.io>
Co-authored-by: Xiaojian Zhou <gzhou@pivotal.io>
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
index 35837ac..77cc175 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientMetadataService.java
@@ -30,6 +30,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.FixedPartitionResolver;
@@ -493,6 +494,10 @@
return bucketId;
}
+ @VisibleForTesting
+ public void scheduleGetPRMetaData(final LocalRegion region, final boolean isRecursive) {
+ scheduleGetPRMetaData((InternalRegion) region, isRecursive);
+ }
public void scheduleGetPRMetaData(final InternalRegion region, final boolean isRecursive) {
if (this.nonPRs.contains(region.getFullPath())) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
index b920343..e16d6cd 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteFunctionOp.java
@@ -24,7 +24,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.geode.InternalGemFireError;
-import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.execute.Function;
@@ -60,6 +59,8 @@
/** index of ignoreFailedMembers in flags[] */
public static final int IGNORE_FAILED_MEMBERS_INDEX = 1;
+ private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
private ExecuteFunctionOp() {
// no instances allowed
}
@@ -83,8 +84,8 @@
} else {
boolean reexecute = false;
+ int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
- int maxRetryAttempts = pool.getRetryAttempts();
if (!isHA) {
maxRetryAttempts = 0;
}
@@ -107,11 +108,8 @@
} catch (ServerConnectivityException se) {
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+ if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+ maxRetryAttempts = pool.calculateRetryAttempts(se);
}
if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index f4d7520..c40df1c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -27,7 +27,6 @@
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.client.NoAvailableServersException;
-import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl;
@@ -60,6 +59,8 @@
private static final Logger logger = LogService.getLogger();
+ private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
private ExecuteRegionFunctionOp() {
// no instances allowed
}
@@ -67,17 +68,14 @@
/**
* Does a execute Function on a server using connections from the given pool to communicate with
* the server.
- *
- * @param pool the pool to use to communicate with the server.
- * @param resultCollector is used to collect the results from the Server
- * @param maxRetryAttempts Maximum number of retry attempts
*/
static void execute(ExecutablePool pool,
ResultCollector resultCollector,
- int maxRetryAttempts, boolean isHA,
+ int retryAttempts, boolean isHA,
ExecuteRegionFunctionOpImpl op, boolean isReexecute,
Set<String> failedNodes) {
+ int maxRetryAttempts = retryAttempts > 0 ? retryAttempts : MAX_RETRY_INITIAL_VALUE;
if (!isHA) {
maxRetryAttempts = 0;
}
@@ -107,11 +105,8 @@
throw failedException;
} catch (ServerConnectivityException se) {
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
+ if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+ maxRetryAttempts = ((PoolImpl) pool).calculateRetryAttempts(se);
}
if ((maxRetryAttempts--) < 1) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
index f41a829..2d5abf1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOp.java
@@ -66,7 +66,6 @@
ServerRegionFunctionExecutor serverRegionExecutor,
ResultCollector resultCollector,
Map<ServerLocation, ? extends HashSet> serverToFilterMap,
- int mRetryAttempts,
boolean isHA,
final java.util.function.Function<ServerRegionFunctionExecutor, AbstractOp> regionFunctionSingleHopOpFunction,
final Supplier<AbstractOp> executeRegionFunctionOpSupplier) {
@@ -87,7 +86,7 @@
final int retryAttempts =
SingleHopClientExecutor.submitAllHA(callableTasks, (LocalRegion) region, isHA,
- resultCollector, failedNodes, mRetryAttempts, ((PoolImpl) pool));
+ resultCollector, failedNodes, ((PoolImpl) pool));
if (isDebugEnabled) {
logger.debug(
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
index fbb7d8a..cc0d5d5 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PoolImpl.java
@@ -41,6 +41,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionService;
import org.apache.geode.cache.client.Pool;
+import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.SubscriptionNotEnabledException;
import org.apache.geode.cache.client.internal.pooling.ConnectionManager;
@@ -1581,4 +1582,27 @@
public int getSubscriptionTimeoutMultiplier() {
return subscriptionTimeoutMultiplier;
}
+
+ public int calculateRetryAttempts(Throwable cause) {
+
+ int maxRetryAttempts = getRetryAttempts();
+
+ if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
+ // If the retryAttempt is set to default(-1). Try executing on all servers once.
+ // As calculating number of servers involves sending message to locator, it is
+ // done only when there is an exception.
+ if (cause instanceof ServerConnectivityException
+ && cause.getMessage().contains(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG)) {
+ // The request was sent once.
+ maxRetryAttempts = getConnectionSource().getAllServers().size() - 1;
+ } else {
+ // The client was unable to establish a connection before sending the
+ // request.
+ maxRetryAttempts = getConnectionSource().getAllServers().size();
+ }
+ }
+
+ return maxRetryAttempts;
+ }
+
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
index dd658ea..4ea852c 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ServerRegionProxy.java
@@ -699,7 +699,7 @@
hasResult, emptySet(), true, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region, serverRegionExecutor,
- resultCollector, serverToBuckets, retryAttempts, function.isHA(),
+ resultCollector, serverToBuckets, function.isHA(),
regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
} else {
@@ -725,7 +725,7 @@
hasResult, emptySet(), isBucketFilter, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
- serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+ serverRegionExecutor, resultCollector, serverToFilterMap,
function.isHA(), regionFunctionSingleHopOpFunction,
executeRegionFunctionOpSupplier);
}
@@ -786,7 +786,7 @@
emptySet(), true, isHA, optimizeForWrite, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
- serverRegionExecutor, resultCollector, serverToBuckets, retryAttempts, isHA,
+ serverRegionExecutor, resultCollector, serverToBuckets, isHA,
regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
@@ -810,7 +810,7 @@
emptySet(), isBucketsAsFilter, isHA, optimizeForWrite, timeoutMs);
ExecuteRegionFunctionSingleHopOp.execute(pool, region,
- serverRegionExecutor, resultCollector, serverToFilterMap, retryAttempts,
+ serverRegionExecutor, resultCollector, serverToFilterMap,
isHA, regionFunctionSingleHopOpFunction, executeRegionFunctionOpSupplier);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
index e5050eb..3799d5d 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/SingleHopClientExecutor.java
@@ -30,7 +30,6 @@
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheClosedException;
-import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.cache.client.internal.GetAllOp.GetAllOpImpl;
@@ -50,6 +49,8 @@
private static final Logger logger = LogService.getLogger();
+ private static final int MAX_RETRY_INITIAL_VALUE = -1;
+
@MakeNotStatic
static final ExecutorService execService =
LoggingExecutors.newCachedThreadPool("Function Execution Thread-", true);
@@ -89,11 +90,10 @@
static int submitAllHA(List callableTasks, LocalRegion region, boolean isHA,
ResultCollector rc, Set<String> failedNodes,
- final int retryAttemptsArg,
final PoolImpl pool) {
- ClientMetadataService cms = region.getCache().getClientMetadataService();
- int maxRetryAttempts = 0;
+ ClientMetadataService cms;
+ int maxRetryAttempts = MAX_RETRY_INITIAL_VALUE;
if (callableTasks != null && !callableTasks.isEmpty()) {
List futures = null;
@@ -120,15 +120,8 @@
throw new InternalGemFireException(e.getMessage());
} catch (ExecutionException ee) {
- if (maxRetryAttempts == 0) {
- maxRetryAttempts = retryAttemptsArg;
- }
-
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = pool.getConnectionSource().getAllServers().size() - 1;
+ if (maxRetryAttempts == MAX_RETRY_INITIAL_VALUE) {
+ maxRetryAttempts = pool.calculateRetryAttempts(ee.getCause());
}
if (ee.getCause() instanceof InternalFunctionInvocationTargetException) {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index 438980c..52e1684 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -70,6 +70,10 @@
public class ConnectionManagerImpl implements ConnectionManager {
private static final Logger logger = LogService.getLogger();
private static final int NOT_WAITING = -1;
+ public static final String BORROW_CONN_ERROR_MSG = "Could not create a new connection to server ";
+ public static final String UNEXPECTED_SOCKET_CLOSED_MSG =
+ "Pool unexpected closed socket on server";
+ public static final String SOCKET_TIME_OUT_MSG = "socket timed out on client";
private final String poolName;
private final PoolStats poolStats;
@@ -321,8 +325,7 @@
return connection;
}
- throw new ServerConnectivityException(
- "Could not create a new connection to server " + server);
+ throw new ServerConnectivityException(BORROW_CONN_ERROR_MSG + server);
}
@Override
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
index ae9ae13..bb11a04 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionOpRetryTest.java
@@ -228,9 +228,8 @@
testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
(pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
.execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
- failureMode));
-
- when(testSupport.getExecutablePool().getRetryAttempts()).thenReturn(retryAttempts);
+ failureMode),
+ retryAttempts);
args = null;
memberMappedArg = mock(MemberMappedArgument.class);
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
index c91816f..7b8bd10 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteFunctionTestSupport.java
@@ -15,6 +15,7 @@
package org.apache.geode.cache.client.internal;
import static org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus.HA;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -29,6 +30,7 @@
import org.apache.geode.cache.client.NoAvailableServersException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.internal.ServerLocation;
@@ -91,8 +93,6 @@
* This method has to be {@code static} because it is called before
* {@link ExecuteFunctionTestSupport} is constructed.
*
- * @param whenPoolExecute is the {@link OngoingStubbing} for (one of the ) {@code execute()}
- * methods on {@link PoolImpl}
* @param failureMode is the {@link FailureMode} that determines the kind of exception
* to {@code throw}
*/
@@ -118,7 +118,8 @@
whenPoolExecute.thenReturn(null);
break;
case THROW_SERVER_CONNECTIVITY_EXCEPTION:
- whenPoolExecute.thenThrow(new ServerConnectivityException("testing"));
+ whenPoolExecute.thenThrow(new ServerConnectivityException(
+ ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
break;
case THROW_SERVER_OPERATION_EXCEPTION:
whenPoolExecute.thenThrow(new ServerOperationException("testing"));
@@ -138,7 +139,7 @@
* we throw this exception first, then we throw ServerConnectivityException
*/
whenPoolExecute.thenThrow(new InternalFunctionInvocationTargetException("testing"))
- .thenThrow(new ServerConnectivityException("testing"));
+ .thenThrow(new ServerConnectivityException(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
break;
default:
throw new AssertionError("unknown FailureMode type: " + failureMode);
@@ -149,7 +150,7 @@
ExecuteFunctionTestSupport(
final HAStatus haStatus,
final FailureMode failureMode,
- final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior) {
+ final BiConsumer<PoolImpl, FailureMode> addPoolMockBehavior, Integer retryAttempts) {
final List<ServerLocation> servers = (List<ServerLocation>) mock(List.class);
when(servers.size()).thenReturn(ExecuteFunctionTestSupport.NUMBER_OF_SERVERS);
@@ -174,6 +175,9 @@
executablePool = mock(PoolImpl.class);
when(executablePool.getConnectionSource()).thenReturn(connectionSource);
+ when(executablePool.getRetryAttempts()).thenReturn(retryAttempts);
+ when(executablePool.calculateRetryAttempts(any(ServerConnectivityException.class)))
+ .thenCallRealMethod();
addPoolMockBehavior.accept(executablePool, failureMode);
}
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
index 74f6748..60b464e 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpRetryTest.java
@@ -41,6 +41,7 @@
import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.FailureMode;
import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.FunctionIdentifierType;
import org.apache.geode.cache.client.internal.ExecuteFunctionTestSupport.HAStatus;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.execute.InternalFunctionInvocationTargetException;
@@ -284,7 +285,8 @@
when.thenReturn(null);
break;
case THROW_SERVER_CONNECTIVITY_EXCEPTION:
- when.thenThrow(new ServerConnectivityException("testing"));
+ when.thenThrow(new ServerConnectivityException(
+ ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
break;
case THROW_SERVER_OPERATION_EXCEPTION:
when.thenThrow(new ServerOperationException("testing"));
@@ -304,12 +306,13 @@
* we throw this exception first, then we throw ServerConnectivityException
*/
when.thenThrow(new InternalFunctionInvocationTargetException("testing"))
- .thenThrow(new ServerConnectivityException("testing"));
+ .thenThrow(
+ new ServerConnectivityException(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG));
break;
default:
throw new AssertionError("unknown FailureMode type: " + failureMode);
}
- });
+ }, retryAttempts);
executeFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
testSupport.getExecutablePool(),
@@ -325,7 +328,8 @@
testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
(pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(pool
.execute(ArgumentMatchers.<AbstractOp>any(), ArgumentMatchers.anyInt())),
- failureMode));
+ failureMode),
+ retryAttempts);
reExecuteFunctionMultiHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
testSupport.getExecutablePool(),
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
index aef00fb..5649b1b 100644
--- a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionSingleHopOpRetryTest.java
@@ -137,7 +137,7 @@
}
private void createMocks(final HAStatus haStatus,
- final FailureMode failureModeArg) {
+ final FailureMode failureModeArg, Integer retryAttempts) {
testSupport = new ExecuteFunctionTestSupport(haStatus, failureModeArg,
(pool, failureMode) -> ExecuteFunctionTestSupport.thenThrow(when(
@@ -146,7 +146,8 @@
ArgumentMatchers.any(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.anyBoolean())),
- failureMode));
+ failureMode),
+ retryAttempts);
serverToFilterMap = new HashMap<>();
serverToFilterMap.put(new ServerLocation("host1", 10), new HashSet<>());
@@ -158,7 +159,7 @@
final int retryAttempts, final int expectTries,
final FailureMode failureMode) {
- createMocks(haStatus, failureMode);
+ createMocks(haStatus, failureMode, retryAttempts);
executeFunctionSingleHopAndValidate(haStatus, functionIdentifierType, retryAttempts,
testSupport.getExecutablePool(),
@@ -182,7 +183,6 @@
() -> ignoreServerConnectivityException(() -> ExecuteRegionFunctionSingleHopOp.execute(
executablePool, testSupport.getRegion(),
executor, resultCollector, serverToFilterMap,
- retryAttempts,
testSupport.toBoolean(haStatus),
executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
testSupport.getRegion().getFullPath(), FUNCTION_NAME,
@@ -199,7 +199,6 @@
ignoreServerConnectivityException(
() -> ExecuteRegionFunctionSingleHopOp.execute(executablePool, testSupport.getRegion(),
executor, resultCollector, serverToFilterMap,
- retryAttempts,
function.isHA(),
executor1 -> new ExecuteRegionFunctionSingleHopOp.ExecuteRegionFunctionSingleHopOpImpl(
testSupport.getRegion().getFullPath(), function,
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
new file mode 100644
index 0000000..901179f
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/PoolImplTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.apache.geode.distributed.ConfigurationProperties.DURABLE_CLIENT_ID;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.pooling.ConnectionManagerImpl;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.gms.membership.HostAddress;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PoolFactoryImpl;
+import org.apache.geode.internal.cache.PoolManagerImpl;
+import org.apache.geode.internal.monitoring.ThreadsMonitoring;
+import org.apache.geode.internal.net.SSLConfigurationFactory;
+import org.apache.geode.internal.security.SecurableCommunicationChannel;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+@Category({ClientServerTest.class})
+public class PoolImplTest {
+
+ @Test
+ public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureWithUnexpectedSocketClose() {
+ List servers = mock(List.class);
+ when(servers.size()).thenReturn(1);
+ ConnectionSource connectionSource = mock(ConnectionSource.class);
+ when(connectionSource.getAllServers()).thenReturn(servers);
+ ServerConnectivityException serverConnectivityException =
+ mock(ServerConnectivityException.class);
+ when(serverConnectivityException.getMessage())
+ .thenReturn(ConnectionManagerImpl.UNEXPECTED_SOCKET_CLOSED_MSG);
+
+ PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+ when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+ assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+ }
+
+ @Test
+ public void calculateRetryAttemptsDoesNotDecrementRetryCountForFailureDuringBorrowConnection() {
+ List servers = mock(List.class);
+ when(servers.size()).thenReturn(1);
+ ConnectionSource connectionSource = mock(ConnectionSource.class);
+ when(connectionSource.getAllServers()).thenReturn(servers);
+ ServerConnectivityException serverConnectivityException =
+ mock(ServerConnectivityException.class);
+ when(serverConnectivityException.getMessage())
+ .thenReturn(ConnectionManagerImpl.BORROW_CONN_ERROR_MSG);
+
+ PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+ when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+ assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(1);
+ }
+
+ @Test
+ public void calculateRetryAttemptsDecrementsRetryCountForFailureAfterSendingTheRequest() {
+ List servers = mock(List.class);
+ when(servers.size()).thenReturn(1);
+ ConnectionSource connectionSource = mock(ConnectionSource.class);
+ when(connectionSource.getAllServers()).thenReturn(servers);
+ ServerConnectivityException serverConnectivityException =
+ mock(ServerConnectivityException.class);
+ when(serverConnectivityException.getMessage())
+ .thenReturn(ConnectionManagerImpl.SOCKET_TIME_OUT_MSG);
+
+ PoolImpl poolImpl = spy(getPool(PoolFactory.DEFAULT_RETRY_ATTEMPTS));
+ when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+ assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(0);
+ }
+
+ @Test
+ public void calculateRetryAttemptsReturnsTheRetyCountConfiguredWithPool() {
+ int retryCount = 1;
+ List servers = mock(List.class);
+ when(servers.size()).thenReturn(1);
+ ConnectionSource connectionSource = mock(ConnectionSource.class);
+ when(connectionSource.getAllServers()).thenReturn(servers);
+ ServerConnectivityException serverConnectivityException =
+ mock(ServerConnectivityException.class);
+ when(serverConnectivityException.getMessage()).thenReturn("Timeout Exception");
+
+ PoolImpl poolImpl = spy(getPool(retryCount));
+ when(poolImpl.getConnectionSource()).thenReturn(connectionSource);
+
+ assertThat(poolImpl.calculateRetryAttempts(serverConnectivityException)).isEqualTo(retryCount);
+ }
+
+ private PoolImpl getPool(int retryAttemptsAttribute) {
+ final DistributionConfig distributionConfig = mock(DistributionConfig.class);
+ doReturn(new SecurableCommunicationChannel[] {}).when(distributionConfig)
+ .getSecurableCommunicationChannels();
+
+ SSLConfigurationFactory.setDistributionConfig(distributionConfig);
+
+ final Properties properties = new Properties();
+ properties.put(DURABLE_CLIENT_ID, "1");
+
+ final Statistics statistics = mock(Statistics.class);
+
+ final PoolFactoryImpl.PoolAttributes poolAttributes =
+ mock(PoolFactoryImpl.PoolAttributes.class);
+
+ /*
+ * These are the minimum pool attributes required
+ * so that basic validation and setup completes successfully. The values of
+ * these attributes have no importance to the assertions of the test itself.
+ */
+ doReturn(1).when(poolAttributes).getMaxConnections();
+ doReturn((long) 10e8).when(poolAttributes).getPingInterval();
+ doReturn(retryAttemptsAttribute).when(poolAttributes).getRetryAttempts();
+
+ final CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+
+ final InternalCache internalCache = mock(InternalCache.class);
+ doReturn(cancelCriterion).when(internalCache).getCancelCriterion();
+
+ final InternalDistributedSystem internalDistributedSystem =
+ mock(InternalDistributedSystem.class);
+ doReturn(distributionConfig).when(internalDistributedSystem).getConfig();
+ doReturn(properties).when(internalDistributedSystem).getProperties();
+ doReturn(statistics).when(internalDistributedSystem).createAtomicStatistics(any(), anyString());
+
+ final PoolManagerImpl poolManager = mock(PoolManagerImpl.class);
+ doReturn(true).when(poolManager).isNormal();
+
+ final ThreadsMonitoring tMonitoring = mock(ThreadsMonitoring.class);
+
+ return PoolImpl.create(poolManager, "pool", poolAttributes, new LinkedList<HostAddress>(),
+ internalDistributedSystem, internalCache, tMonitoring);
+ }
+
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
index ac315bc..98d39d6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeDUnit.java
@@ -14,67 +14,39 @@
*/
package org.apache.geode.cache.lucene;
-import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
-import static org.apache.geode.test.dunit.Assert.fail;
-import static org.junit.Assert.assertEquals;
import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.logging.log4j.Logger;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
-import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.AvailablePortHelper;
-import org.apache.geode.internal.cache.GemFireCacheImpl;
-import org.apache.geode.internal.serialization.Version;
-import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.internal.DUnitLauncher;
-import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
import org.apache.geode.test.version.TestVersion;
import org.apache.geode.test.version.VersionManager;
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
-public abstract class LuceneSearchWithRollingUpgradeDUnit extends JUnit4DistributedTestCase {
+public abstract class LuceneSearchWithRollingUpgradeDUnit
+ extends LuceneSearchWithRollingUpgradeTestBase {
- @Parameterized.Parameters(name = "from_v{0}, with reindex={1}")
+ @Parameterized.Parameters(name = "from_v{0}, with reindex={1}, singleHopEnabled={2}")
public static Collection<Object[]> data() {
Collection<String> luceneVersions = getLuceneVersions();
Collection<Object[]> rval = new ArrayList<>();
luceneVersions.forEach(v -> {
- rval.add(new Object[] {v, true});
- rval.add(new Object[] {v, false});
+ rval.add(new Object[] {v, true, true});
+ rval.add(new Object[] {v, false, true});
});
return rval;
}
@@ -84,6 +56,10 @@
// Lucene Compatibility checks start with Apache Geode v1.2.0
// Removing the versions older than v1.2.0
result.removeIf(s -> TestVersion.compare(s, "1.2.0") < 0);
+
+ // The changes relating to GEODE-7258 is not applied on 1.10.0, skipping rolling
+ // upgrade for 1.10.0. The change was verified by rolling from develop to develop.
+ result.removeIf(s -> TestVersion.compare(s, "1.10.0") == 0);
if (result.size() < 1) {
throw new RuntimeException("No older versions of Geode were found to test against");
} else {
@@ -92,15 +68,6 @@
return result;
}
- private File[] testingDirs = new File[3];
-
- protected static String INDEX_NAME = "index";
-
- private static String diskDir = "LuceneSearchWithRollingUpgradeDUnit";
-
- // Each vm will have a cache object
- protected static Object cache;
-
// the old version of Geode we're testing against
@Parameterized.Parameter()
public String oldVersion;
@@ -108,137 +75,8 @@
@Parameterized.Parameter(1)
public Boolean reindex;
- private void deleteVMFiles() {
- System.out.println("deleting files in vm" + VM.getCurrentVMNum());
- File pwd = new File(".");
- for (File entry : pwd.listFiles()) {
- try {
- if (entry.isDirectory()) {
- FileUtils.deleteDirectory(entry);
- } else {
- entry.delete();
- }
- } catch (Exception e) {
- System.out.println("Could not delete " + entry + ": " + e.getMessage());
- }
- }
- }
-
- private void deleteWorkingDirFiles() {
- Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
- }
-
- @Override
- public void postSetUp() {
- deleteWorkingDirFiles();
- IgnoredException.addIgnoredException(
- "cluster configuration service not available|ConflictingPersistentDataException");
- }
-
-
- Properties getLocatorPropertiesPre91(String locatorsString) {
- Properties props = new Properties();
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
- return props;
- }
-
- VM rollClientToCurrentAndCreateRegion(VM oldClient, ClientRegionShortcut shortcut,
- String regionName, String[] hostNames, int[] locatorPorts, boolean subscriptionEnabled) {
- VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled);
- // recreate region on "rolled" client
- invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
- return rollClient;
- }
-
- private VM rollClientToCurrent(VM oldClient, String[] hostNames, int[] locatorPorts,
- boolean subscriptionEnabled) {
- oldClient.invoke(invokeCloseCache());
- VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
- rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
- subscriptionEnabled));
- rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
- return rollClient;
- }
-
- CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
- final ClientRegionShortcut shortcut) {
- return new CacheSerializableRunnable("execute: createClientRegion") {
- @Override
- public void run2() {
- try {
- createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
- shortcut);
- } catch (Exception e) {
- fail("Error creating client region", e);
- }
- }
- };
- }
-
- private static void createClientRegion(GemFireCache cache, String regionName,
- ClientRegionShortcut shortcut) {
- ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
- rf.create(regionName);
- }
-
- CacheSerializableRunnable invokeStartCacheServer(final int port) {
- return new CacheSerializableRunnable("execute: startCacheServer") {
- @Override
- public void run2() {
- try {
- startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeDUnit.cache, port);
- } catch (Exception e) {
- fail("Error creating cache", e);
- }
- }
- };
- }
-
- private static void startCacheServer(GemFireCache cache, int port) throws Exception {
- CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
- cacheServer.setPort(port);
- cacheServer.start();
- }
-
- CacheSerializableRunnable invokeCreateClientCache(final Properties systemProperties,
- final String[] hosts, final int[] ports, boolean subscriptionEnabled) {
- return new CacheSerializableRunnable("execute: createClientCache") {
- @Override
- public void run2() {
- try {
- LuceneSearchWithRollingUpgradeDUnit.cache =
- createClientCache(systemProperties, hosts, ports, subscriptionEnabled);
- } catch (Exception e) {
- fail("Error creating client cache", e);
- }
- }
- };
- }
-
- Properties getClientSystemProperties() {
- Properties p = new Properties();
- p.setProperty("mcast-port", "0");
- return p;
- }
-
-
- private static ClientCache createClientCache(Properties systemProperties, String[] hosts,
- int[] ports, boolean subscriptionEnabled) {
- ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
- if (subscriptionEnabled) {
- cf.setPoolSubscriptionEnabled(true);
- cf.setPoolSubscriptionRedundancy(-1);
- }
- int hostsLength = hosts.length;
- for (int i = 0; i < hostsLength; i++) {
- cf.addPoolLocator(hosts[i], ports[i]);
- }
-
- return cf.create();
- }
+ @Parameterized.Parameter(2)
+ public Boolean singleHopEnabled;
// We start an "old" locator and old servers
// We roll the locator
@@ -303,7 +141,7 @@
locatorString);
server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType,
- testingDirs[0], shortcutName, regionName, locatorPorts);
+ testingDirs[0], shortcutName, regionName, locatorPorts, reindex);
verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server1);
expectedRegionSize += 5;
putSerializableObjectAndVerifyLuceneQueryResult(server1, regionName, expectedRegionSize, 5,
@@ -313,7 +151,7 @@
20, server1, server3);
server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType,
- testingDirs[1], shortcutName, regionName, locatorPorts);
+ testingDirs[1], shortcutName, regionName, locatorPorts, reindex);
verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server2);
expectedRegionSize += 5;
putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
@@ -323,7 +161,7 @@
30, server2, server3);
server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType,
- testingDirs[2], shortcutName, regionName, locatorPorts);
+ testingDirs[2], shortcutName, regionName, locatorPorts, reindex);
verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, server3);
putSerializableObjectAndVerifyLuceneQueryResult(server3, regionName, expectedRegionSize, 15,
25, server1, server2);
@@ -340,627 +178,4 @@
}
}
- void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
- int expectedRegionSize, int start, int end, VM... vms) throws Exception {
- // do puts
- putSerializableObject(putter, regionName, start, end);
-
- // verify present in others
- verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
- }
-
- void putSerializableObject(VM putter, String regionName, int start, int end)
- throws Exception {
- for (int i = start; i < end; i++) {
- Class aClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.query.data.Portfolio");
- Constructor portfolioConstructor = aClass.getConstructor(int.class);
- Object serializableObject = portfolioConstructor.newInstance(i);
- putter.invoke(invokePut(regionName, i, serializableObject));
- }
- }
-
- private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
- await().untilAsserted(() -> {
- Object region =
- cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
- int regionSize = (int) region.getClass().getMethod("size").invoke(region);
- assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
- regionSize);
- });
- }
-
- void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
- throws Exception {
- Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
- Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
- Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
- luceneService.getClass()
- .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
- .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
- Method createLuceneQueryFactoryMethod =
- luceneService.getClass().getMethod("createLuceneQueryFactory");
- createLuceneQueryFactoryMethod.setAccessible(true);
- Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
- Object luceneQuery = luceneQueryFactory.getClass()
- .getMethod("create", String.class, String.class, String.class, String.class)
- .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
-
- Collection resultsActive = executeLuceneQuery(luceneQuery);
-
- luceneQuery = luceneQueryFactory.getClass()
- .getMethod("create", String.class, String.class, String.class, String.class)
- .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
-
- Collection resultsInactive = executeLuceneQuery(luceneQuery);
-
- assertEquals("Result size not as expected ", expectedRegionSize,
- resultsActive.size() + resultsInactive.size());
- }
-
- private Collection executeLuceneQuery(Object luceneQuery)
- throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
- Collection results = null;
- int retryCount = 10;
- while (true) {
- try {
- results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
- break;
- } catch (Exception ex) {
- if (!ex.getCause().getMessage().contains("currently indexing")) {
- throw ex;
- }
- if (--retryCount == 0) {
- throw ex;
- }
- }
- }
- return results;
-
- }
-
- private void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
- VM... vms) {
- for (VM vm : vms) {
- vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
- vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
- }
-
- }
-
- void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
- for (VM vm : vms) {
- vm.invoke(runnable);
- }
- }
-
- // Used to close cache and make sure we attempt on all vms even if some do not have a cache
- void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
- VM... vms) {
- for (VM vm : vms) {
- try {
- vm.invoke(runnable);
- } catch (Exception e) {
- if (!catchErrors) {
- throw e;
- }
- }
- }
- }
-
- private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
- // Roll the server
- oldServer.invoke(invokeCloseCache());
- VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
- rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
- : getSystemPropertiesPost71(locatorPorts)));
- rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
- return rollServer;
- }
-
- VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer, String regionType,
- File diskdir, String shortcutName, String regionName, int[] locatorPorts) {
- VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
- return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
- rollServer);
- }
-
- private VM createLuceneIndexAndRegionOnRolledServer(String regionType, File diskdir,
- String shortcutName, String regionName, VM rollServer) {
-
- Boolean serializeIt = reindex;
- rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
- rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
- // recreate region on "rolled" server
- if ((regionType.equals("persistentPartitioned"))) {
- CacheSerializableRunnable runnable =
- invokeCreatePersistentPartitionedRegion(regionName, diskdir);
- invokeRunnableInVMs(runnable, rollServer);
- } else {
- invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
- }
- rollServer.invoke(invokeRebalance());
- return rollServer;
- }
-
- VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
- String shortcutName, String regionName, int[] locatorPorts) {
- VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
- // recreate region on "rolled" server
- if ((regionType.equals("persistentPartitioned"))) {
- CacheSerializableRunnable runnable =
- invokeCreatePersistentPartitionedRegion(regionName, diskdir);
- invokeRunnableInVMs(runnable, rollServer);
- } else {
- invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
- }
- rollServer.invoke(invokeRebalance());
- return rollServer;
- }
-
- VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
- final String testName, final String locatorString) {
- // Roll the locator
- oldLocator.invoke(invokeStopLocator());
- VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
- final Properties props = new Properties();
- props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
- rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
- return rollLocator;
- }
-
- // Due to licensing changes
- private Properties getSystemPropertiesPost71() {
- Properties props = getSystemProperties();
- return props;
- }
-
- // Due to licensing changes
- private Properties getSystemPropertiesPost71(int[] locatorPorts) {
- Properties props = getSystemProperties(locatorPorts);
- return props;
- }
-
- private Properties getSystemProperties() {
- Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
- props.remove("disable-auto-reconnect");
- props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
- props.remove(DistributionConfig.LOCK_MEMORY_NAME);
- return props;
- }
-
- Properties getSystemProperties(int[] locatorPorts) {
- Properties p = new Properties();
- String locatorString = getLocatorString(locatorPorts);
- p.setProperty("locators", locatorString);
- p.setProperty("mcast-port", "0");
- return p;
- }
-
- static String getLocatorString(int locatorPort) {
- String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
- return locatorString;
- }
-
- static String getLocatorString(int[] locatorPorts) {
- StringBuilder locatorString = new StringBuilder();
- int numLocators = locatorPorts.length;
- for (int i = 0; i < numLocators; i++) {
- locatorString.append(getLocatorString(locatorPorts[i]));
- if (i + 1 < numLocators) {
- locatorString.append(",");
- }
- }
- return locatorString.toString();
- }
-
- private CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
- final String testName, final String locatorsString, final Properties props) {
- return new CacheSerializableRunnable("execute: startLocator") {
- @Override
- public void run2() {
- try {
- startLocator(serverHostName, port, testName, locatorsString, props);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
- final Properties props) {
- return new CacheSerializableRunnable("execute: startLocator") {
- @Override
- public void run2() {
- try {
- startLocator(serverHostName, port, props);
- } catch (Exception e) {
- fail("Error starting locators", e);
- }
- }
- };
- }
-
- CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
- return new CacheSerializableRunnable("execute: createCache") {
- @Override
- public void run2() {
- try {
- LuceneSearchWithRollingUpgradeDUnit.cache = createCache(systemProperties);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private CacheSerializableRunnable invokeAssertVersion(final short version) {
- return new CacheSerializableRunnable("execute: assertVersion") {
- @Override
- public void run2() {
- try {
- assertVersion(LuceneSearchWithRollingUpgradeDUnit.cache, version);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- CacheSerializableRunnable invokeCreateRegion(final String regionName,
- final String shortcutName) {
- return new CacheSerializableRunnable("execute: createRegion") {
- @Override
- public void run2() {
- try {
- createRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, shortcutName);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(final String regionName,
- final File diskstore) {
- return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
- @Override
- public void run2() {
- try {
- createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeDUnit.cache, regionName,
- diskstore);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private CacheSerializableRunnable invokePut(final String regionName, final Object key,
- final Object value) {
- return new CacheSerializableRunnable("execute: put") {
- @Override
- public void run2() {
- try {
- put(LuceneSearchWithRollingUpgradeDUnit.cache, regionName, key, value);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- CacheSerializableRunnable invokeStopLocator() {
- return new CacheSerializableRunnable("execute: stopLocator") {
- @Override
- public void run2() {
- try {
- stopLocator();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- CacheSerializableRunnable invokeCloseCache() {
- return new CacheSerializableRunnable("execute: closeCache") {
- @Override
- public void run2() {
- try {
- closeCache(LuceneSearchWithRollingUpgradeDUnit.cache);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private CacheSerializableRunnable invokeRebalance() {
- return new CacheSerializableRunnable("execute: rebalance") {
- @Override
- public void run2() {
- try {
- rebalance(LuceneSearchWithRollingUpgradeDUnit.cache);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- }
-
- private void deleteDiskStores() {
- try {
- FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
- } catch (IOException e) {
- throw new Error("Error deleting files", e);
- }
- }
-
- private static Object createCache(Properties systemProperties) throws Exception {
-
- Class distConfigClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
- boolean disableConfig = true;
- try {
- distConfigClass.getDeclaredField("useSharedConfiguration");
- } catch (NoSuchFieldException e) {
- disableConfig = false;
- }
- if (disableConfig) {
- systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
- }
-
- Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.CacheFactory");
- Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
- constructor.setAccessible(true);
- Object cacheFactory = constructor.newInstance(systemProperties);
-
- Method createMethod = cacheFactoryClass.getMethod("create");
- createMethod.setAccessible(true);
- Object cache = createMethod.invoke(cacheFactory);
- return cache;
- }
-
- private static Object getRegion(Object cache, String regionName) throws Exception {
- return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
- }
-
- private static Object put(Object cache, String regionName, Object key, Object value)
- throws Exception {
- Object region = getRegion(cache, regionName);
- return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
- value);
- }
-
- private static void createRegion(Object cache, String regionName, String shortcutName)
- throws Exception {
- Class aClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.RegionShortcut");
- Object[] enumConstants = aClass.getEnumConstants();
- Object shortcut = null;
- int length = enumConstants.length;
- for (int i = 0; i < length; i++) {
- Object constant = enumConstants[i];
- if (((Enum) constant).name().equals(shortcutName)) {
- shortcut = constant;
- break;
- }
- }
-
- Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
- createRegionFactoryMethod.setAccessible(true);
- Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
- Method createMethod = regionFactory.getClass().getMethod("create", String.class);
- createMethod.setAccessible(true);
- createMethod.invoke(regionFactory, regionName);
- }
-
- static void createLuceneIndex(Object cache, String regionName, String indexName)
- throws Exception {
- Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
- Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
- Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
- Method createLuceneIndexFactoryMethod =
- luceneService.getClass().getMethod("createIndexFactory");
- createLuceneIndexFactoryMethod.setAccessible(true);
- Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
- luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
- "status");
- luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
- .invoke(luceneIndexFactory, indexName, regionName);
- }
-
- static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
- String indexName) throws Exception {
- Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
- Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
- Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
- Method createLuceneIndexFactoryMethod =
- luceneService.getClass().getMethod("createIndexFactory");
- createLuceneIndexFactoryMethod.setAccessible(true);
- Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
- luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
- "status");
- luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
- .invoke(luceneIndexFactory, indexName, regionName, true);
- }
-
- private static void createPersistentPartitonedRegion(Object cache, String regionName,
- File diskStore) throws Exception {
- Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
- Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.cache.DataPolicy");
- Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
- if (store == null) {
- Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
- dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
- dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
- new Object[] {new File[] {diskStore.getAbsoluteFile()}});
- dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
- }
- Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
- rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
- rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
- rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
- }
-
- private static void assertVersion(Object cache, short ordinal) throws Exception {
- Class idmClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
- Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
- getDSMethod.setAccessible(true);
- Object ds = getDSMethod.invoke(cache);
-
- Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
- getDistributedMemberMethod.setAccessible(true);
- Object member = getDistributedMemberMethod.invoke(ds);
- Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
- getVersionObjectMethod.setAccessible(true);
- Object thisVersion = getVersionObjectMethod.invoke(member);
- Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
- getOrdinalMethod.setAccessible(true);
- short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
- if (ordinal != thisOrdinal) {
- throw new Error(
- "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
- }
- }
-
- private static void stopCacheServers(Object cache) throws Exception {
- Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
- getCacheServersMethod.setAccessible(true);
- List cacheServers = (List) getCacheServersMethod.invoke(cache);
- Method stopMethod = null;
- for (Object cs : cacheServers) {
- if (stopMethod == null) {
- stopMethod = cs.getClass().getMethod("stop");
- }
- stopMethod.setAccessible(true);
- stopMethod.invoke(cs);
- }
- }
-
- private static void closeCache(Object cache) throws Exception {
- if (cache == null) {
- return;
- }
- Method isClosedMethod = cache.getClass().getMethod("isClosed");
- isClosedMethod.setAccessible(true);
- boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
- if (cache != null && !cacheClosed) {
- stopCacheServers(cache);
- Method method = cache.getClass().getMethod("close");
- method.setAccessible(true);
- method.invoke(cache);
- long startTime = System.currentTimeMillis();
- while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
- try {
- Thread.sleep(1000);
- Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
- cacheClosedMethod.setAccessible(true);
- cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
-
- private static void rebalance(Object cache) throws Exception {
- Method getRMMethod = cache.getClass().getMethod("getResourceManager");
- getRMMethod.setAccessible(true);
- Object manager = getRMMethod.invoke(cache);
-
- Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
- createRebalanceFactoryMethod.setAccessible(true);
- Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
- Method m = rebalanceFactory.getClass().getMethod("start");
- m.setAccessible(true);
- Object op = m.invoke(rebalanceFactory);
-
- // Wait until the rebalance is complete
- try {
- Method getResultsMethod = op.getClass().getMethod("getResults");
- getResultsMethod.setAccessible(true);
- Object results = getResultsMethod.invoke(op);
- Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
- getTotalTimeMethod.setAccessible(true);
- System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
- Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
- getTotalBucketsMethod.setAccessible(true);
- System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
- } catch (Exception e) {
- Thread.currentThread().interrupt();
- throw e;
- }
- }
-
- /**
- * Starts a locator with given configuration.
- */
- private static void startLocator(final String serverHostName, final int port,
- final String testName, final String locatorsString, final Properties props) throws Exception {
- props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
- props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
- Logger logger = LogService.getLogger();
- props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
-
- InetAddress bindAddr;
- try {
- bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
- } catch (UnknownHostException uhe) {
- throw new Error("While resolving bind address ", uhe);
- }
-
- File logFile = new File(testName + "-locator" + port + ".log");
- Class locatorClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.distributed.Locator");
- Method startLocatorAndDSMethod =
- locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
- Properties.class, boolean.class, boolean.class, String.class);
- startLocatorAndDSMethod.setAccessible(true);
- startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
- }
-
- private static void startLocator(final String serverHostName, final int port, Properties props)
- throws Exception {
-
-
- InetAddress bindAddr = null;
- try {
- bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
- } catch (UnknownHostException uhe) {
- throw new Error("While resolving bind address ", uhe);
- }
-
- Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
- Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
- }
-
- private static void stopLocator() throws Exception {
- Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
- .loadClass("org.apache.geode.distributed.internal.InternalLocator");
- Method locatorMethod = internalLocatorClass.getMethod("getLocator");
- locatorMethod.setAccessible(true);
- Object locator = locatorMethod.invoke(null);
- Method stopLocatorMethod = locator.getClass().getMethod("stop");
- stopLocatorMethod.setAccessible(true);
- stopLocatorMethod.invoke(locator);
- }
-
- /**
- * Get the port that the standard dunit locator is listening on.
- *
- * @return locator address
- */
- private static String getDUnitLocatorAddress() {
- return Host.getHost(0).getHostName();
- }
-
}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
new file mode 100644
index 0000000..b08d875
--- /dev/null
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/LuceneSearchWithRollingUpgradeTestBase.java
@@ -0,0 +1,883 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.GemFireCache;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.internal.ClientMetadataService;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache30.CacheSerializableRunnable;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.Invoke;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.DUnitLauncher;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.version.VersionManager;
+
+public abstract class LuceneSearchWithRollingUpgradeTestBase extends JUnit4DistributedTestCase {
+
+ protected File[] testingDirs = new File[3];
+
+ protected static String INDEX_NAME = "index";
+
+ protected static String diskDir = "LuceneSearchWithRollingUpgradeTestBase";
+
+ // Each vm will have a cache object
+ protected static Object cache;
+
+ protected void deleteVMFiles() {
+ System.out.println("deleting files in vm" + VM.getCurrentVMNum());
+ File pwd = new File(".");
+ for (File entry : pwd.listFiles()) {
+ try {
+ if (entry.isDirectory()) {
+ FileUtils.deleteDirectory(entry);
+ } else {
+ entry.delete();
+ }
+ } catch (Exception e) {
+ System.out.println("Could not delete " + entry + ": " + e.getMessage());
+ }
+ }
+ }
+
+ protected void deleteWorkingDirFiles() {
+ Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
+ }
+
+ @Override
+ public void postSetUp() {
+ deleteWorkingDirFiles();
+ IgnoredException.addIgnoredException(
+ "cluster configuration service not available|ConflictingPersistentDataException");
+ }
+
+
+ Properties getLocatorPropertiesPre91(String locatorsString) {
+ Properties props = new Properties();
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
+ return props;
+ }
+
+ VM rollClientToCurrentAndCreateRegion(VM oldClient,
+ ClientRegionShortcut shortcut,
+ String regionName, String[] hostNames, int[] locatorPorts,
+ boolean subscriptionEnabled, boolean singleHopEnabled) {
+ VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled,
+ singleHopEnabled);
+ // recreate region on "rolled" client
+ invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
+ return rollClient;
+ }
+
+ protected VM rollClientToCurrent(VM oldClient, String[] hostNames,
+ int[] locatorPorts,
+ boolean subscriptionEnabled, boolean singleHopEnabled) {
+ oldClient.invoke(invokeCloseCache());
+ VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
+ rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
+ subscriptionEnabled, singleHopEnabled));
+ rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+ return rollClient;
+ }
+
+ CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
+ final ClientRegionShortcut shortcut) {
+ return new CacheSerializableRunnable("execute: createClientRegion") {
+ @Override
+ public void run2() {
+ try {
+ createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache,
+ regionName,
+ shortcut);
+ } catch (Exception e) {
+ fail("Error creating client region", e);
+ }
+ }
+ };
+ }
+
+ protected static void createClientRegion(GemFireCache cache, String regionName,
+ ClientRegionShortcut shortcut) {
+ ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
+ rf.create(regionName);
+ }
+
+ CacheSerializableRunnable invokeStartCacheServer(final int port) {
+ return new CacheSerializableRunnable("execute: startCacheServer") {
+ @Override
+ public void run2() {
+ try {
+ startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, port);
+ } catch (Exception e) {
+ fail("Error creating cache", e);
+ }
+ }
+ };
+ }
+
+ protected static void startCacheServer(GemFireCache cache, int port) throws Exception {
+ CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
+ cacheServer.setPort(port);
+ cacheServer.start();
+ }
+
+ CacheSerializableRunnable invokeCreateClientCache(
+ final Properties systemProperties,
+ final String[] hosts, final int[] ports, boolean subscriptionEnabled,
+ boolean singleHopEnabled) {
+ return new CacheSerializableRunnable("execute: createClientCache") {
+ @Override
+ public void run2() {
+ try {
+ LuceneSearchWithRollingUpgradeTestBase.cache =
+ createClientCache(systemProperties, hosts, ports, subscriptionEnabled,
+ singleHopEnabled);
+ } catch (Exception e) {
+ fail("Error creating client cache", e);
+ }
+ }
+ };
+ }
+
+ Properties getClientSystemProperties() {
+ Properties p = new Properties();
+ p.setProperty("mcast-port", "0");
+ return p;
+ }
+
+
+ protected static ClientCache createClientCache(Properties systemProperties,
+ String[] hosts,
+ int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) {
+ ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
+ if (subscriptionEnabled) {
+ cf.setPoolSubscriptionEnabled(true);
+ cf.setPoolSubscriptionRedundancy(-1);
+ }
+ cf.setPoolPRSingleHopEnabled(singleHopEnabled);
+ int hostsLength = hosts.length;
+ for (int i = 0; i < hostsLength; i++) {
+ cf.addPoolLocator(hosts[i], ports[i]);
+ }
+
+ return cf.create();
+ }
+
+
+ void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
+ int expectedRegionSize, int start, int end, VM... vms) throws Exception {
+ // do puts
+ putSerializableObject(putter, regionName, start, end);
+
+ // verify present in others
+ verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
+ }
+
+ void putSerializableObject(VM putter, String regionName, int start, int end)
+ throws Exception {
+ for (int i = start; i < end; i++) {
+ Class aClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.query.data.Portfolio");
+ Constructor portfolioConstructor = aClass.getConstructor(int.class);
+ Object serializableObject = portfolioConstructor.newInstance(i);
+ putter.invoke(invokePut(regionName, i, serializableObject));
+ }
+ }
+
+ private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
+ await().untilAsserted(() -> {
+ Object region =
+ cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
+ int regionSize = (int) region.getClass().getMethod("size").invoke(region);
+ assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
+ regionSize);
+ });
+ }
+
+ void updateClientSingleHopMetadata(String regionName) {
+ ClientMetadataService cms = ((InternalCache) cache)
+ .getClientMetadataService();
+ cms.scheduleGetPRMetaData(
+ (LocalRegion) ((InternalCache) cache).getRegion(regionName), true);
+ GeodeAwaitility.await("Awaiting ClientMetadataService.isMetadataStable()")
+ .untilAsserted(() -> assertThat(cms.isMetadataStable()).isTrue());
+ }
+
+ void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
+ throws Exception {
+ Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+ Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+ Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+ luceneService.getClass()
+ .getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
+ .invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
+ Method createLuceneQueryFactoryMethod =
+ luceneService.getClass().getMethod("createLuceneQueryFactory");
+ createLuceneQueryFactoryMethod.setAccessible(true);
+ Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
+ Object luceneQuery = luceneQueryFactory.getClass()
+ .getMethod("create", String.class, String.class, String.class, String.class)
+ .invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
+
+ Collection resultsActive = executeLuceneQuery(luceneQuery);
+
+ luceneQuery = luceneQueryFactory.getClass()
+ .getMethod("create", String.class, String.class, String.class, String.class)
+ .invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
+
+ Collection resultsInactive = executeLuceneQuery(luceneQuery);
+
+ assertEquals("Result size not as expected ", expectedRegionSize,
+ resultsActive.size() + resultsInactive.size());
+ }
+
+ protected Collection executeLuceneQuery(Object luceneQuery)
+ throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+ Collection results = null;
+ int retryCount = 10;
+ while (true) {
+ try {
+ results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
+ break;
+ } catch (Exception ex) {
+ if (!ex.getCause().getMessage().contains("currently indexing")) {
+ throw ex;
+ }
+ if (--retryCount == 0) {
+ throw ex;
+ }
+ }
+ }
+ return results;
+
+ }
+
+ protected void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
+ VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
+ vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
+ }
+
+ }
+
+ void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
+ for (VM vm : vms) {
+ vm.invoke(runnable);
+ }
+ }
+
+ // Used to close cache and make sure we attempt on all vms even if some do not have a cache
+ void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
+ VM... vms) {
+ for (VM vm : vms) {
+ try {
+ vm.invoke(runnable);
+ } catch (Exception e) {
+ if (!catchErrors) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
+ // Roll the server
+ oldServer.invoke(invokeCloseCache());
+ VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
+ rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
+ : getSystemPropertiesPost71(locatorPorts)));
+ rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
+ return rollServer;
+ }
+
+ VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer,
+ String regionType,
+ File diskdir, String shortcutName, String regionName, int[] locatorPorts,
+ boolean reindex) {
+ VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
+ return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
+ rollServer, reindex);
+ }
+
+ private VM createLuceneIndexAndRegionOnRolledServer(String regionType,
+ File diskdir,
+ String shortcutName, String regionName, VM rollServer, boolean reindex) {
+
+ Boolean serializeIt = reindex;
+ rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
+ rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+ // recreate region on "rolled" server
+ if ((regionType.equals("persistentPartitioned"))) {
+ CacheSerializableRunnable runnable =
+ invokeCreatePersistentPartitionedRegion(regionName, diskdir);
+ invokeRunnableInVMs(runnable, rollServer);
+ } else {
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
+ }
+ rollServer.invoke(invokeRebalance());
+ return rollServer;
+ }
+
+ VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
+ String shortcutName, String regionName, int[] locatorPorts) {
+ VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
+ // recreate region on "rolled" server
+ if ((regionType.equals("persistentPartitioned"))) {
+ CacheSerializableRunnable runnable =
+ invokeCreatePersistentPartitionedRegion(regionName, diskdir);
+ invokeRunnableInVMs(runnable, rollServer);
+ } else {
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
+ }
+ rollServer.invoke(invokeRebalance());
+ return rollServer;
+ }
+
+ VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
+ final String testName, final String locatorString) {
+ // Roll the locator
+ oldLocator.invoke(invokeStopLocator());
+ VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
+ final Properties props = new Properties();
+ props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
+ rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
+ return rollLocator;
+ }
+
+ // Due to licensing changes
+ private Properties getSystemPropertiesPost71() {
+ Properties props = getSystemProperties();
+ return props;
+ }
+
+ // Due to licensing changes
+ private Properties getSystemPropertiesPost71(int[] locatorPorts) {
+ Properties props = getSystemProperties(locatorPorts);
+ return props;
+ }
+
+ private Properties getSystemProperties() {
+ Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
+ props.remove("disable-auto-reconnect");
+ props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
+ props.remove(DistributionConfig.LOCK_MEMORY_NAME);
+ return props;
+ }
+
+ Properties getSystemProperties(int[] locatorPorts) {
+ Properties p = new Properties();
+ String locatorString = getLocatorString(locatorPorts);
+ p.setProperty("locators", locatorString);
+ p.setProperty("mcast-port", "0");
+ return p;
+ }
+
+ static String getLocatorString(int locatorPort) {
+ String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
+ return locatorString;
+ }
+
+ static String getLocatorString(int[] locatorPorts) {
+ StringBuilder locatorString = new StringBuilder();
+ int numLocators = locatorPorts.length;
+ for (int i = 0; i < numLocators; i++) {
+ locatorString.append(getLocatorString(locatorPorts[i]));
+ if (i + 1 < numLocators) {
+ locatorString.append(",");
+ }
+ }
+ return locatorString.toString();
+ }
+
+ protected CacheSerializableRunnable invokeStartLocator(final String serverHostName,
+ final int port,
+ final String testName, final String locatorsString, final Properties props) {
+ return new CacheSerializableRunnable("execute: startLocator") {
+ @Override
+ public void run2() {
+ try {
+ startLocator(serverHostName, port, testName, locatorsString, props);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
+ final Properties props) {
+ return new CacheSerializableRunnable("execute: startLocator") {
+ @Override
+ public void run2() {
+ try {
+ startLocator(serverHostName, port, props);
+ } catch (Exception e) {
+ fail("Error starting locators", e);
+ }
+ }
+ };
+ }
+
+ CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
+ return new CacheSerializableRunnable("execute: createCache") {
+ @Override
+ public void run2() {
+ try {
+ LuceneSearchWithRollingUpgradeTestBase.cache = createCache(systemProperties);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private CacheSerializableRunnable invokeAssertVersion(final short version) {
+ return new CacheSerializableRunnable("execute: assertVersion") {
+ @Override
+ public void run2() {
+ try {
+ assertVersion(LuceneSearchWithRollingUpgradeTestBase.cache, version);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ CacheSerializableRunnable invokeCreateRegion(final String regionName,
+ final String shortcutName) {
+ return new CacheSerializableRunnable("execute: createRegion") {
+ @Override
+ public void run2() {
+ try {
+ createRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, shortcutName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ protected CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(
+ final String regionName,
+ final File diskstore) {
+ return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
+ @Override
+ public void run2() {
+ try {
+ createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName,
+ diskstore);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ protected CacheSerializableRunnable invokePut(final String regionName, final Object key,
+ final Object value) {
+ return new CacheSerializableRunnable("execute: put") {
+ @Override
+ public void run2() {
+ try {
+ put(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, key, value);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ CacheSerializableRunnable invokeStopLocator() {
+ return new CacheSerializableRunnable("execute: stopLocator") {
+ @Override
+ public void run2() {
+ try {
+ stopLocator();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ CacheSerializableRunnable invokeCloseCache() {
+ return new CacheSerializableRunnable("execute: closeCache") {
+ @Override
+ public void run2() {
+ try {
+ closeCache(LuceneSearchWithRollingUpgradeTestBase.cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ protected CacheSerializableRunnable invokeRebalance() {
+ return new CacheSerializableRunnable("execute: rebalance") {
+ @Override
+ public void run2() {
+ try {
+ rebalance(LuceneSearchWithRollingUpgradeTestBase.cache);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ protected void deleteDiskStores() {
+ try {
+ FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
+ } catch (IOException e) {
+ throw new Error("Error deleting files", e);
+ }
+ }
+
+ protected static Object createCache(Properties systemProperties) throws Exception {
+
+ Class distConfigClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
+ boolean disableConfig = true;
+ try {
+ distConfigClass.getDeclaredField("useSharedConfiguration");
+ } catch (NoSuchFieldException e) {
+ disableConfig = false;
+ }
+ if (disableConfig) {
+ systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
+ }
+
+ Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.CacheFactory");
+ Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
+ constructor.setAccessible(true);
+ Object cacheFactory = constructor.newInstance(systemProperties);
+
+ Method createMethod = cacheFactoryClass.getMethod("create");
+ createMethod.setAccessible(true);
+ Object cache = createMethod.invoke(cacheFactory);
+ return cache;
+ }
+
+ protected static Object getRegion(Object cache, String regionName) throws Exception {
+ return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
+ }
+
+ protected static Object put(Object cache, String regionName, Object key, Object value)
+ throws Exception {
+ Object region = getRegion(cache, regionName);
+ return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
+ value);
+ }
+
+ protected static void createRegion(Object cache, String regionName, String shortcutName)
+ throws Exception {
+ Class aClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.RegionShortcut");
+ Object[] enumConstants = aClass.getEnumConstants();
+ Object shortcut = null;
+ int length = enumConstants.length;
+ for (int i = 0; i < length; i++) {
+ Object constant = enumConstants[i];
+ if (((Enum) constant).name().equals(shortcutName)) {
+ shortcut = constant;
+ break;
+ }
+ }
+
+ Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
+ createRegionFactoryMethod.setAccessible(true);
+ Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
+ Method createMethod = regionFactory.getClass().getMethod("create", String.class);
+ createMethod.setAccessible(true);
+ createMethod.invoke(regionFactory, regionName);
+ }
+
+ static void createLuceneIndex(Object cache, String regionName, String indexName)
+ throws Exception {
+ Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+ Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+ Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+ Method createLuceneIndexFactoryMethod =
+ luceneService.getClass().getMethod("createIndexFactory");
+ createLuceneIndexFactoryMethod.setAccessible(true);
+ Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
+ luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
+ "status");
+ luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
+ .invoke(luceneIndexFactory, indexName, regionName);
+ }
+
+ static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
+ String indexName) throws Exception {
+ Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
+ Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
+ Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
+ Method createLuceneIndexFactoryMethod =
+ luceneService.getClass().getMethod("createIndexFactory");
+ createLuceneIndexFactoryMethod.setAccessible(true);
+ Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
+ luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
+ "status");
+ luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
+ .invoke(luceneIndexFactory, indexName, regionName, true);
+ }
+
+ protected static void createPersistentPartitonedRegion(Object cache, String regionName,
+ File diskStore) throws Exception {
+ Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
+ Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.cache.DataPolicy");
+ Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
+ if (store == null) {
+ Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
+ dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
+ dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
+ new Object[] {new File[] {diskStore.getAbsoluteFile()}});
+ dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
+ }
+ Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
+ rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
+ rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
+ rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
+ }
+
+ protected static void assertVersion(Object cache, short ordinal) throws Exception {
+ Class idmClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
+ Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
+ getDSMethod.setAccessible(true);
+ Object ds = getDSMethod.invoke(cache);
+
+ Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
+ getDistributedMemberMethod.setAccessible(true);
+ Object member = getDistributedMemberMethod.invoke(ds);
+ Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
+ getVersionObjectMethod.setAccessible(true);
+ Object thisVersion = getVersionObjectMethod.invoke(member);
+ Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
+ getOrdinalMethod.setAccessible(true);
+ short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
+ if (ordinal != thisOrdinal) {
+ throw new Error(
+ "Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
+ }
+ }
+
+ protected static void stopCacheServers(Object cache) throws Exception {
+ Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
+ getCacheServersMethod.setAccessible(true);
+ List cacheServers = (List) getCacheServersMethod.invoke(cache);
+ Method stopMethod = null;
+ for (Object cs : cacheServers) {
+ if (stopMethod == null) {
+ stopMethod = cs.getClass().getMethod("stop");
+ }
+ stopMethod.setAccessible(true);
+ stopMethod.invoke(cs);
+ }
+ }
+
+ protected static void closeCache(Object cache) throws Exception {
+ if (cache == null) {
+ return;
+ }
+ Method isClosedMethod = cache.getClass().getMethod("isClosed");
+ isClosedMethod.setAccessible(true);
+ boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
+ if (cache != null && !cacheClosed) {
+ stopCacheServers(cache);
+ Method method = cache.getClass().getMethod("close");
+ method.setAccessible(true);
+ method.invoke(cache);
+ long startTime = System.currentTimeMillis();
+ while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
+ try {
+ Thread.sleep(1000);
+ Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
+ cacheClosedMethod.setAccessible(true);
+ cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ protected static void rebalance(Object cache) throws Exception {
+ Method getRMMethod = cache.getClass().getMethod("getResourceManager");
+ getRMMethod.setAccessible(true);
+ Object manager = getRMMethod.invoke(cache);
+
+ Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
+ createRebalanceFactoryMethod.setAccessible(true);
+ Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
+ Method m = rebalanceFactory.getClass().getMethod("start");
+ m.setAccessible(true);
+ Object op = m.invoke(rebalanceFactory);
+
+ // Wait until the rebalance is complete
+ try {
+ Method getResultsMethod = op.getClass().getMethod("getResults");
+ getResultsMethod.setAccessible(true);
+ Object results = getResultsMethod.invoke(op);
+ Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
+ getTotalTimeMethod.setAccessible(true);
+ System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
+ Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
+ getTotalBucketsMethod.setAccessible(true);
+ System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
+ } catch (Exception e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
+ /**
+ * Starts a locator with given configuration.
+ */
+ protected static void startLocator(final String serverHostName, final int port,
+ final String testName, final String locatorsString, final Properties props) throws Exception {
+ props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+ props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
+ Logger logger = LogService.getLogger();
+ props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
+
+ InetAddress bindAddr;
+ try {
+ bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
+ } catch (UnknownHostException uhe) {
+ throw new Error("While resolving bind address ", uhe);
+ }
+
+ File logFile = new File(testName + "-locator" + port + ".log");
+ Class locatorClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.distributed.Locator");
+ Method startLocatorAndDSMethod =
+ locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
+ Properties.class, boolean.class, boolean.class, String.class);
+ startLocatorAndDSMethod.setAccessible(true);
+ startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
+ }
+
+ protected static void startLocator(final String serverHostName, final int port, Properties props)
+ throws Exception {
+
+
+ InetAddress bindAddr = null;
+ try {
+ bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
+ } catch (UnknownHostException uhe) {
+ throw new Error("While resolving bind address ", uhe);
+ }
+
+ Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
+ Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
+ }
+
+ protected static void stopLocator() throws Exception {
+ Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
+ .loadClass("org.apache.geode.distributed.internal.InternalLocator");
+ Method locatorMethod = internalLocatorClass.getMethod("getLocator");
+ locatorMethod.setAccessible(true);
+ Object locator = locatorMethod.invoke(null);
+ Method stopLocatorMethod = locator.getClass().getMethod("stop");
+ stopLocatorMethod.setAccessible(true);
+ stopLocatorMethod.invoke(locator);
+ }
+
+ /**
+ * Get the port that the standard dunit locator is listening on.
+ *
+ * @return locator address
+ */
+ protected static String getDUnitLocatorAddress() {
+ return Host.getHost(0).getHostName();
+ }
+
+ protected Object executeDummyFunction(String regionName) {
+ Region region = ((InternalCache) cache).getRegion(regionName);
+ Object result = FunctionService.onRegion(region).execute("TestFunction").getResult();
+ ArrayList<Boolean> list = (ArrayList) result;
+ assertThat(list.get(0)).isTrue();
+ return result;
+ }
+
+ static class TestFunction implements Function, Declarable {
+
+ public void execute(FunctionContext context) {
+ context.getResultSender().lastResult(true);
+ }
+
+ public String getId() {
+ return getClass().getSimpleName();
+ }
+
+ public boolean optimizeForWrite() {
+ return true;
+ }
+ }
+
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
index cb68055..8178ea6 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultAfterTwoLocatorsWithTwoServersAreRolled.java
@@ -84,7 +84,7 @@
locatorString);
server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
- shortcut.name(), regionName, locatorPorts);
+ shortcut.name(), regionName, locatorPorts, reindex);
expectedRegionSize += 10;
putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 15,
25, server2);
@@ -97,7 +97,7 @@
30, server1);
server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
- shortcut.name(), regionName, locatorPorts);
+ shortcut.name(), regionName, locatorPorts, reindex);
expectedRegionSize += 5;
putSerializableObjectAndVerifyLuceneQueryResult(server2, regionName, expectedRegionSize, 25,
35, server1, server2);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
new file mode 100644
index 0000000..b2e3f94
--- /dev/null
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.lucene;
+
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.test.dunit.DistributedTestUtils;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.NetworkUtils;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+import org.apache.geode.test.version.VersionManager;
+
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRestartedFromCurrentVersion
+ extends LuceneSearchWithRollingUpgradeTestBase {
+
+ @Parameterized.Parameter()
+ public Boolean reindex;
+
+ @Parameterized.Parameter(1)
+ public Boolean singleHopEnabled;
+
+ @Parameterized.Parameters(name = "currentVersion, reindex={0}, singleHopEnabled={1}")
+ public static Collection<Object[]> data() {
+ Collection<Object[]> rval = new ArrayList<>();
+ rval.add(new Object[] {true, true});
+ rval.add(new Object[] {true, false});
+ rval.add(new Object[] {false, true});
+ rval.add(new Object[] {false, false});
+ return rval;
+ }
+
+ @Test
+ public void functionsFailOverWhenRestartOneServer()
+ throws Exception {
+ // Since the changes relating to GEODE-7258 is not applied on 1.10.0,
+ // use this test to roll from develop to develop to verify.
+ final Host host = Host.getHost(0);
+ VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0);
+ VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+ VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+ VM client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+ final String regionName = "aRegion";
+ String regionType = "partitionedRedundant";
+ RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+
+ int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+ int[] locatorPorts = new int[] {ports[0]};
+ int[] csPorts = new int[] {ports[1], ports[2]};
+
+ locator.invoke(() -> DistributedTestUtils.deleteLocatorStateFile(locatorPorts));
+
+ String hostName = NetworkUtils.getServerHostName(host);
+ String[] hostNames = new String[] {hostName};
+ String locatorString = getLocatorString(locatorPorts);
+
+ try {
+ // Start locator, servers and client in old version
+ locator.invoke(
+ invokeStartLocator(hostName, locatorPorts[0], getLocatorPropertiesPre91(locatorString)));
+
+ // Locators before 1.4 handled configuration asynchronously.
+ // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+ locator.invoke(
+ () -> await()
+ .untilAsserted(() -> assertTrue(
+ !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration()
+ || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+ invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)), server1, server2);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
+ invokeRunnableInVMs(
+ invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+ singleHopEnabled),
+ client);
+
+ // Create the index on the servers
+ server1.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+ server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
+
+ // Create the region on the servers and client
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()), server1, server2);
+ invokeRunnableInVMs(invokeCreateClientRegion(regionName, ClientRegionShortcut.PROXY), client);
+
+ // Put objects on the client so that each bucket is created
+ int numObjects = 113;
+ putSerializableObject(client, regionName, 0, numObjects);
+
+ // Execute a query on the client and verify the results. This also waits until flushed.
+ client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+
+ // Roll the locator and server 1 to current version
+ locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
+ locatorString);
+ server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
+ shortcut.name(), regionName, locatorPorts, reindex);
+
+ // Execute a query on the client and verify the results. This also waits until flushed.
+ client.invoke(() -> {
+ updateClientSingleHopMetadata(regionName);
+ verifyLuceneQueryResults(regionName, numObjects);
+ });
+
+ // Put some objects on the client. This will update the document to the latest lucene version
+ putSerializableObject(client, regionName, 0, numObjects);
+
+ // Execute a query on the client and verify the results. This also waits until flushed.
+ client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+
+ // Close server 1 cache. This will force server 2 (old version) to become primary
+ invokeRunnableInVMs(true, invokeCloseCache(), server1);
+
+ // Execute a query on the client and verify the results
+ client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));
+ } finally {
+ invokeRunnableInVMs(true, invokeStopLocator(), locator);
+ invokeRunnableInVMs(true, invokeCloseCache(), client, server2);
+ }
+ }
+
+ @Test
+ public void stopOneServerThenRerunFunction()
+ throws Throwable {
+ // Since the changes relating to GEODE-7258 is not applied on 1.10.0,
+ // use this test to roll from develop to develop to verify.
+ final Host host = Host.getHost(0);
+ VM locator = host.getVM(VersionManager.CURRENT_VERSION, 0);
+ VM server1 = host.getVM(VersionManager.CURRENT_VERSION, 1);
+ VM server2 = host.getVM(VersionManager.CURRENT_VERSION, 2);
+ VM client = host.getVM(VersionManager.CURRENT_VERSION, 3);
+
+ final String regionName = "aRegion";
+ String regionType = "partitionedRedundant";
+ RegionShortcut shortcut = RegionShortcut.PARTITION_REDUNDANT;
+
+ int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(3);
+ int[] locatorPorts = new int[] {ports[0]};
+ int[] csPorts = new int[] {ports[1], ports[2]};
+
+ locator
+ .invoke(() -> DistributedTestUtils.deleteLocatorStateFile(locatorPorts));
+
+ String hostName = NetworkUtils.getServerHostName(host);
+ String[] hostNames = new String[] {hostName};
+ String locatorString = getLocatorString(locatorPorts);
+
+ try {
+ // Start locator, servers and client in old version
+ locator.invoke(
+ invokeStartLocator(hostName, locatorPorts[0],
+ getLocatorPropertiesPre91(locatorString)));
+
+ // Locators before 1.4 handled configuration asynchronously.
+ // We must wait for configuration configuration to be ready, or confirm that it is disabled.
+ locator.invoke(
+ () -> await()
+ .untilAsserted(() -> assertTrue(
+ !InternalLocator.getLocator().getConfig()
+ .getEnableClusterConfiguration()
+ || InternalLocator.getLocator().isSharedConfigurationRunning())));
+
+ invokeRunnableInVMs(invokeCreateCache(getSystemProperties(locatorPorts)),
+ server1, server2);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
+ invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
+ invokeRunnableInVMs(
+ invokeCreateClientCache(getClientSystemProperties(), hostNames,
+ locatorPorts, false,
+ singleHopEnabled),
+ client);
+
+ // Create the region on the servers and client
+ invokeRunnableInVMs(invokeCreateRegion(regionName, shortcut.name()),
+ server1, server2);
+ invokeRunnableInVMs(
+ invokeCreateClientRegion(regionName, ClientRegionShortcut.PROXY),
+ client);
+ server1
+ .invoke(() -> FunctionService.registerFunction(new TestFunction()));
+ server2
+ .invoke(() -> FunctionService.registerFunction(new TestFunction()));
+
+ // Put objects on the client so that each bucket is created
+ int numObjects = 113;
+ putSerializableObject(client, regionName, 0, numObjects);
+
+ client.invoke(
+ () -> await().untilAsserted(() -> {
+ ArrayList<Boolean> result = (ArrayList<Boolean>) executeDummyFunction(
+ regionName);
+ assertTrue(result.size() == 2);
+ }));
+
+ server1.invoke(invokeCloseCache());
+
+ client.invoke(() -> {
+ Object result = executeDummyFunction(regionName);
+ ArrayList<Boolean> list = (ArrayList) result;
+ assertThat(list.get(0)).isTrue();
+ });
+ } finally {
+ invokeRunnableInVMs(true, invokeStopLocator(), locator);
+ invokeRunnableInVMs(true, invokeCloseCache(), client, server2);
+ }
+ }
+}
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
index 95c0498..df1e329 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOver.java
@@ -70,7 +70,8 @@
invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
invokeRunnableInVMs(
- invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+ invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+ singleHopEnabled),
client);
server2.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
server3.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
@@ -87,7 +88,7 @@
locatorString);
server3 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server3, regionType, null,
- shortcut.name(), regionName, locatorPorts);
+ shortcut.name(), regionName, locatorPorts, reindex);
invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server3);
expectedRegionSize += 10;
putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 20,
@@ -97,7 +98,7 @@
40, server2);
server2 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server2, regionType, null,
- shortcut.name(), regionName, locatorPorts);
+ shortcut.name(), regionName, locatorPorts, reindex);
invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server2);
expectedRegionSize += 10;
putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 40,
@@ -107,7 +108,7 @@
60, server3);
client = rollClientToCurrentAndCreateRegion(client, ClientRegionShortcut.PROXY, regionName,
- hostNames, locatorPorts, false);
+ hostNames, locatorPorts, false, singleHopEnabled);
expectedRegionSize += 10;
putSerializableObjectAndVerifyLuceneQueryResult(client, regionName, expectedRegionSize, 60,
70, server2, server3);
diff --git a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
index 3acba6c..430e8fa 100644
--- a/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
+++ b/geode-lucene/src/upgradeTest/java/org/apache/geode/cache/lucene/RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated.java
@@ -17,7 +17,6 @@
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertTrue;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.geode.cache.RegionShortcut;
@@ -32,7 +31,6 @@
public class RollingUpgradeQueryReturnsCorrectResultsAfterClientAndServersAreRolledOverAllBucketsCreated
extends LuceneSearchWithRollingUpgradeDUnit {
- @Ignore("Disabled until GEODE-7258 is fixed")
@Test
public void test()
throws Exception {
@@ -85,7 +83,8 @@
invokeRunnableInVMs(invokeStartCacheServer(csPorts[0]), server1);
invokeRunnableInVMs(invokeStartCacheServer(csPorts[1]), server2);
invokeRunnableInVMs(
- invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false),
+ invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts, false,
+ singleHopEnabled),
client);
// Create the index on the servers
@@ -107,7 +106,7 @@
locator = rollLocatorToCurrent(locator, hostName, locatorPorts[0], getTestMethodName(),
locatorString);
server1 = rollServerToCurrentCreateLuceneIndexAndCreateRegion(server1, regionType, null,
- shortcut.name(), regionName, locatorPorts);
+ shortcut.name(), regionName, locatorPorts, reindex);
// Execute a query on the client and verify the results. This also waits until flushed.
client.invoke(() -> verifyLuceneQueryResults(regionName, numObjects));