blob: b08d875381bfe5c00482faf5f809047118327d76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.lucene;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.internal.ClientMetadataService;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.lucene.internal.LuceneServiceImpl;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.Locator;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.DUnitLauncher;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.version.VersionManager;
public abstract class LuceneSearchWithRollingUpgradeTestBase extends JUnit4DistributedTestCase {
protected File[] testingDirs = new File[3];
protected static String INDEX_NAME = "index";
protected static String diskDir = "LuceneSearchWithRollingUpgradeTestBase";
// Each vm will have a cache object
protected static Object cache;
protected void deleteVMFiles() {
System.out.println("deleting files in vm" + VM.getCurrentVMNum());
File pwd = new File(".");
for (File entry : pwd.listFiles()) {
try {
if (entry.isDirectory()) {
FileUtils.deleteDirectory(entry);
} else {
entry.delete();
}
} catch (Exception e) {
System.out.println("Could not delete " + entry + ": " + e.getMessage());
}
}
}
protected void deleteWorkingDirFiles() {
Invoke.invokeInEveryVM("delete files", () -> deleteVMFiles());
}
@Override
public void postSetUp() {
deleteWorkingDirFiles();
IgnoredException.addIgnoredException(
"cluster configuration service not available|ConflictingPersistentDataException");
}
Properties getLocatorPropertiesPre91(String locatorsString) {
Properties props = new Properties();
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel);
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "true");
return props;
}
VM rollClientToCurrentAndCreateRegion(VM oldClient,
ClientRegionShortcut shortcut,
String regionName, String[] hostNames, int[] locatorPorts,
boolean subscriptionEnabled, boolean singleHopEnabled) {
VM rollClient = rollClientToCurrent(oldClient, hostNames, locatorPorts, subscriptionEnabled,
singleHopEnabled);
// recreate region on "rolled" client
invokeRunnableInVMs(invokeCreateClientRegion(regionName, shortcut), rollClient);
return rollClient;
}
protected VM rollClientToCurrent(VM oldClient, String[] hostNames,
int[] locatorPorts,
boolean subscriptionEnabled, boolean singleHopEnabled) {
oldClient.invoke(invokeCloseCache());
VM rollClient = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldClient.getId());
rollClient.invoke(invokeCreateClientCache(getClientSystemProperties(), hostNames, locatorPorts,
subscriptionEnabled, singleHopEnabled));
rollClient.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
return rollClient;
}
CacheSerializableRunnable invokeCreateClientRegion(final String regionName,
final ClientRegionShortcut shortcut) {
return new CacheSerializableRunnable("execute: createClientRegion") {
@Override
public void run2() {
try {
createClientRegion((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache,
regionName,
shortcut);
} catch (Exception e) {
fail("Error creating client region", e);
}
}
};
}
protected static void createClientRegion(GemFireCache cache, String regionName,
ClientRegionShortcut shortcut) {
ClientRegionFactory rf = ((ClientCache) cache).createClientRegionFactory(shortcut);
rf.create(regionName);
}
CacheSerializableRunnable invokeStartCacheServer(final int port) {
return new CacheSerializableRunnable("execute: startCacheServer") {
@Override
public void run2() {
try {
startCacheServer((GemFireCache) LuceneSearchWithRollingUpgradeTestBase.cache, port);
} catch (Exception e) {
fail("Error creating cache", e);
}
}
};
}
protected static void startCacheServer(GemFireCache cache, int port) throws Exception {
CacheServer cacheServer = ((GemFireCacheImpl) cache).addCacheServer();
cacheServer.setPort(port);
cacheServer.start();
}
CacheSerializableRunnable invokeCreateClientCache(
final Properties systemProperties,
final String[] hosts, final int[] ports, boolean subscriptionEnabled,
boolean singleHopEnabled) {
return new CacheSerializableRunnable("execute: createClientCache") {
@Override
public void run2() {
try {
LuceneSearchWithRollingUpgradeTestBase.cache =
createClientCache(systemProperties, hosts, ports, subscriptionEnabled,
singleHopEnabled);
} catch (Exception e) {
fail("Error creating client cache", e);
}
}
};
}
Properties getClientSystemProperties() {
Properties p = new Properties();
p.setProperty("mcast-port", "0");
return p;
}
protected static ClientCache createClientCache(Properties systemProperties,
String[] hosts,
int[] ports, boolean subscriptionEnabled, boolean singleHopEnabled) {
ClientCacheFactory cf = new ClientCacheFactory(systemProperties);
if (subscriptionEnabled) {
cf.setPoolSubscriptionEnabled(true);
cf.setPoolSubscriptionRedundancy(-1);
}
cf.setPoolPRSingleHopEnabled(singleHopEnabled);
int hostsLength = hosts.length;
for (int i = 0; i < hostsLength; i++) {
cf.addPoolLocator(hosts[i], ports[i]);
}
return cf.create();
}
void putSerializableObjectAndVerifyLuceneQueryResult(VM putter, String regionName,
int expectedRegionSize, int start, int end, VM... vms) throws Exception {
// do puts
putSerializableObject(putter, regionName, start, end);
// verify present in others
verifyLuceneQueryResultInEachVM(regionName, expectedRegionSize, vms);
}
void putSerializableObject(VM putter, String regionName, int start, int end)
throws Exception {
for (int i = start; i < end; i++) {
Class aClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.query.data.Portfolio");
Constructor portfolioConstructor = aClass.getConstructor(int.class);
Object serializableObject = portfolioConstructor.newInstance(i);
putter.invoke(invokePut(regionName, i, serializableObject));
}
}
private void waitForRegionToHaveExpectedSize(String regionName, int expectedRegionSize) {
await().untilAsserted(() -> {
Object region =
cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
int regionSize = (int) region.getClass().getMethod("size").invoke(region);
assertEquals("Region size not as expected after 60 seconds", expectedRegionSize,
regionSize);
});
}
void updateClientSingleHopMetadata(String regionName) {
ClientMetadataService cms = ((InternalCache) cache)
.getClientMetadataService();
cms.scheduleGetPRMetaData(
(LocalRegion) ((InternalCache) cache).getRegion(regionName), true);
GeodeAwaitility.await("Awaiting ClientMetadataService.isMetadataStable()")
.untilAsserted(() -> assertThat(cms.isMetadataStable()).isTrue());
}
void verifyLuceneQueryResults(String regionName, int expectedRegionSize)
throws Exception {
Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
luceneService.getClass()
.getMethod("waitUntilFlushed", String.class, String.class, long.class, TimeUnit.class)
.invoke(luceneService, INDEX_NAME, regionName, 60, TimeUnit.SECONDS);
Method createLuceneQueryFactoryMethod =
luceneService.getClass().getMethod("createLuceneQueryFactory");
createLuceneQueryFactoryMethod.setAccessible(true);
Object luceneQueryFactory = createLuceneQueryFactoryMethod.invoke(luceneService);
Object luceneQuery = luceneQueryFactory.getClass()
.getMethod("create", String.class, String.class, String.class, String.class)
.invoke(luceneQueryFactory, INDEX_NAME, regionName, "active", "status");
Collection resultsActive = executeLuceneQuery(luceneQuery);
luceneQuery = luceneQueryFactory.getClass()
.getMethod("create", String.class, String.class, String.class, String.class)
.invoke(luceneQueryFactory, INDEX_NAME, regionName, "inactive", "status");
Collection resultsInactive = executeLuceneQuery(luceneQuery);
assertEquals("Result size not as expected ", expectedRegionSize,
resultsActive.size() + resultsInactive.size());
}
protected Collection executeLuceneQuery(Object luceneQuery)
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
Collection results = null;
int retryCount = 10;
while (true) {
try {
results = (Collection) luceneQuery.getClass().getMethod("findKeys").invoke(luceneQuery);
break;
} catch (Exception ex) {
if (!ex.getCause().getMessage().contains("currently indexing")) {
throw ex;
}
if (--retryCount == 0) {
throw ex;
}
}
}
return results;
}
protected void verifyLuceneQueryResultInEachVM(String regionName, int expectedRegionSize,
VM... vms) {
for (VM vm : vms) {
vm.invoke(() -> waitForRegionToHaveExpectedSize(regionName, expectedRegionSize));
vm.invoke(() -> verifyLuceneQueryResults(regionName, expectedRegionSize));
}
}
void invokeRunnableInVMs(CacheSerializableRunnable runnable, VM... vms) {
for (VM vm : vms) {
vm.invoke(runnable);
}
}
// Used to close cache and make sure we attempt on all vms even if some do not have a cache
void invokeRunnableInVMs(boolean catchErrors, CacheSerializableRunnable runnable,
VM... vms) {
for (VM vm : vms) {
try {
vm.invoke(runnable);
} catch (Exception e) {
if (!catchErrors) {
throw e;
}
}
}
}
private VM rollServerToCurrent(VM oldServer, int[] locatorPorts) {
// Roll the server
oldServer.invoke(invokeCloseCache());
VM rollServer = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldServer.getId());
rollServer.invoke(invokeCreateCache(locatorPorts == null ? getSystemPropertiesPost71()
: getSystemPropertiesPost71(locatorPorts)));
rollServer.invoke(invokeAssertVersion(Version.CURRENT_ORDINAL));
return rollServer;
}
VM rollServerToCurrentCreateLuceneIndexAndCreateRegion(VM oldServer,
String regionType,
File diskdir, String shortcutName, String regionName, int[] locatorPorts,
boolean reindex) {
VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
return createLuceneIndexAndRegionOnRolledServer(regionType, diskdir, shortcutName, regionName,
rollServer, reindex);
}
private VM createLuceneIndexAndRegionOnRolledServer(String regionType,
File diskdir,
String shortcutName, String regionName, VM rollServer, boolean reindex) {
Boolean serializeIt = reindex;
rollServer.invoke(() -> LuceneServiceImpl.LUCENE_REINDEX = serializeIt);
rollServer.invoke(() -> createLuceneIndex(cache, regionName, INDEX_NAME));
// recreate region on "rolled" server
if ((regionType.equals("persistentPartitioned"))) {
CacheSerializableRunnable runnable =
invokeCreatePersistentPartitionedRegion(regionName, diskdir);
invokeRunnableInVMs(runnable, rollServer);
} else {
invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
}
rollServer.invoke(invokeRebalance());
return rollServer;
}
VM rollServerToCurrentAndCreateRegionOnly(VM oldServer, String regionType, File diskdir,
String shortcutName, String regionName, int[] locatorPorts) {
VM rollServer = rollServerToCurrent(oldServer, locatorPorts);
// recreate region on "rolled" server
if ((regionType.equals("persistentPartitioned"))) {
CacheSerializableRunnable runnable =
invokeCreatePersistentPartitionedRegion(regionName, diskdir);
invokeRunnableInVMs(runnable, rollServer);
} else {
invokeRunnableInVMs(invokeCreateRegion(regionName, shortcutName), rollServer);
}
rollServer.invoke(invokeRebalance());
return rollServer;
}
VM rollLocatorToCurrent(VM oldLocator, final String serverHostName, final int port,
final String testName, final String locatorString) {
// Roll the locator
oldLocator.invoke(invokeStopLocator());
VM rollLocator = Host.getHost(0).getVM(VersionManager.CURRENT_VERSION, oldLocator.getId());
final Properties props = new Properties();
props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false");
rollLocator.invoke(invokeStartLocator(serverHostName, port, testName, locatorString, props));
return rollLocator;
}
// Due to licensing changes
private Properties getSystemPropertiesPost71() {
Properties props = getSystemProperties();
return props;
}
// Due to licensing changes
private Properties getSystemPropertiesPost71(int[] locatorPorts) {
Properties props = getSystemProperties(locatorPorts);
return props;
}
private Properties getSystemProperties() {
Properties props = DistributedTestUtils.getAllDistributedSystemProperties(new Properties());
props.remove("disable-auto-reconnect");
props.remove(DistributionConfig.OFF_HEAP_MEMORY_SIZE_NAME);
props.remove(DistributionConfig.LOCK_MEMORY_NAME);
return props;
}
Properties getSystemProperties(int[] locatorPorts) {
Properties p = new Properties();
String locatorString = getLocatorString(locatorPorts);
p.setProperty("locators", locatorString);
p.setProperty("mcast-port", "0");
return p;
}
static String getLocatorString(int locatorPort) {
String locatorString = getDUnitLocatorAddress() + "[" + locatorPort + "]";
return locatorString;
}
static String getLocatorString(int[] locatorPorts) {
StringBuilder locatorString = new StringBuilder();
int numLocators = locatorPorts.length;
for (int i = 0; i < numLocators; i++) {
locatorString.append(getLocatorString(locatorPorts[i]));
if (i + 1 < numLocators) {
locatorString.append(",");
}
}
return locatorString.toString();
}
protected CacheSerializableRunnable invokeStartLocator(final String serverHostName,
final int port,
final String testName, final String locatorsString, final Properties props) {
return new CacheSerializableRunnable("execute: startLocator") {
@Override
public void run2() {
try {
startLocator(serverHostName, port, testName, locatorsString, props);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
CacheSerializableRunnable invokeStartLocator(final String serverHostName, final int port,
final Properties props) {
return new CacheSerializableRunnable("execute: startLocator") {
@Override
public void run2() {
try {
startLocator(serverHostName, port, props);
} catch (Exception e) {
fail("Error starting locators", e);
}
}
};
}
CacheSerializableRunnable invokeCreateCache(final Properties systemProperties) {
return new CacheSerializableRunnable("execute: createCache") {
@Override
public void run2() {
try {
LuceneSearchWithRollingUpgradeTestBase.cache = createCache(systemProperties);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
private CacheSerializableRunnable invokeAssertVersion(final short version) {
return new CacheSerializableRunnable("execute: assertVersion") {
@Override
public void run2() {
try {
assertVersion(LuceneSearchWithRollingUpgradeTestBase.cache, version);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
CacheSerializableRunnable invokeCreateRegion(final String regionName,
final String shortcutName) {
return new CacheSerializableRunnable("execute: createRegion") {
@Override
public void run2() {
try {
createRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, shortcutName);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
protected CacheSerializableRunnable invokeCreatePersistentPartitionedRegion(
final String regionName,
final File diskstore) {
return new CacheSerializableRunnable("execute: createPersistentPartitonedRegion") {
@Override
public void run2() {
try {
createPersistentPartitonedRegion(LuceneSearchWithRollingUpgradeTestBase.cache, regionName,
diskstore);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
protected CacheSerializableRunnable invokePut(final String regionName, final Object key,
final Object value) {
return new CacheSerializableRunnable("execute: put") {
@Override
public void run2() {
try {
put(LuceneSearchWithRollingUpgradeTestBase.cache, regionName, key, value);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
CacheSerializableRunnable invokeStopLocator() {
return new CacheSerializableRunnable("execute: stopLocator") {
@Override
public void run2() {
try {
stopLocator();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
CacheSerializableRunnable invokeCloseCache() {
return new CacheSerializableRunnable("execute: closeCache") {
@Override
public void run2() {
try {
closeCache(LuceneSearchWithRollingUpgradeTestBase.cache);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
protected CacheSerializableRunnable invokeRebalance() {
return new CacheSerializableRunnable("execute: rebalance") {
@Override
public void run2() {
try {
rebalance(LuceneSearchWithRollingUpgradeTestBase.cache);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}
protected void deleteDiskStores() {
try {
FileUtils.deleteDirectory(new File(diskDir).getAbsoluteFile());
} catch (IOException e) {
throw new Error("Error deleting files", e);
}
}
protected static Object createCache(Properties systemProperties) throws Exception {
Class distConfigClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.distributed.internal.DistributionConfigImpl");
boolean disableConfig = true;
try {
distConfigClass.getDeclaredField("useSharedConfiguration");
} catch (NoSuchFieldException e) {
disableConfig = false;
}
if (disableConfig) {
systemProperties.put(DistributionConfig.USE_CLUSTER_CONFIGURATION_NAME, "false");
}
Class cacheFactoryClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.CacheFactory");
Constructor constructor = cacheFactoryClass.getConstructor(Properties.class);
constructor.setAccessible(true);
Object cacheFactory = constructor.newInstance(systemProperties);
Method createMethod = cacheFactoryClass.getMethod("create");
createMethod.setAccessible(true);
Object cache = createMethod.invoke(cacheFactory);
return cache;
}
protected static Object getRegion(Object cache, String regionName) throws Exception {
return cache.getClass().getMethod("getRegion", String.class).invoke(cache, regionName);
}
protected static Object put(Object cache, String regionName, Object key, Object value)
throws Exception {
Object region = getRegion(cache, regionName);
return region.getClass().getMethod("put", Object.class, Object.class).invoke(region, key,
value);
}
protected static void createRegion(Object cache, String regionName, String shortcutName)
throws Exception {
Class aClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.RegionShortcut");
Object[] enumConstants = aClass.getEnumConstants();
Object shortcut = null;
int length = enumConstants.length;
for (int i = 0; i < length; i++) {
Object constant = enumConstants[i];
if (((Enum) constant).name().equals(shortcutName)) {
shortcut = constant;
break;
}
}
Method createRegionFactoryMethod = cache.getClass().getMethod("createRegionFactory", aClass);
createRegionFactoryMethod.setAccessible(true);
Object regionFactory = createRegionFactoryMethod.invoke(cache, shortcut);
Method createMethod = regionFactory.getClass().getMethod("create", String.class);
createMethod.setAccessible(true);
createMethod.invoke(regionFactory, regionName);
}
static void createLuceneIndex(Object cache, String regionName, String indexName)
throws Exception {
Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
Method createLuceneIndexFactoryMethod =
luceneService.getClass().getMethod("createIndexFactory");
createLuceneIndexFactoryMethod.setAccessible(true);
Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
"status");
luceneIndexFactory.getClass().getMethod("create", String.class, String.class)
.invoke(luceneIndexFactory, indexName, regionName);
}
static void createLuceneIndexOnExistingRegion(Object cache, String regionName,
String indexName) throws Exception {
Class luceneServiceProvider = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.lucene.LuceneServiceProvider");
Method getLuceneService = luceneServiceProvider.getMethod("get", GemFireCache.class);
Object luceneService = getLuceneService.invoke(luceneServiceProvider, cache);
Method createLuceneIndexFactoryMethod =
luceneService.getClass().getMethod("createIndexFactory");
createLuceneIndexFactoryMethod.setAccessible(true);
Object luceneIndexFactory = createLuceneIndexFactoryMethod.invoke(luceneService);
luceneIndexFactory.getClass().getMethod("addField", String.class).invoke(luceneIndexFactory,
"status");
luceneIndexFactory.getClass().getMethod("create", String.class, String.class, boolean.class)
.invoke(luceneIndexFactory, indexName, regionName, true);
}
protected static void createPersistentPartitonedRegion(Object cache, String regionName,
File diskStore) throws Exception {
Object store = cache.getClass().getMethod("findDiskStore", String.class).invoke(cache, "store");
Class dataPolicyObject = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.cache.DataPolicy");
Object dataPolicy = dataPolicyObject.getField("PERSISTENT_PARTITION").get(null);
if (store == null) {
Object dsf = cache.getClass().getMethod("createDiskStoreFactory").invoke(cache);
dsf.getClass().getMethod("setMaxOplogSize", long.class).invoke(dsf, 1L);
dsf.getClass().getMethod("setDiskDirs", File[].class).invoke(dsf,
new Object[] {new File[] {diskStore.getAbsoluteFile()}});
dsf.getClass().getMethod("create", String.class).invoke(dsf, "store");
}
Object rf = cache.getClass().getMethod("createRegionFactory").invoke(cache);
rf.getClass().getMethod("setDiskStoreName", String.class).invoke(rf, "store");
rf.getClass().getMethod("setDataPolicy", dataPolicy.getClass()).invoke(rf, dataPolicy);
rf.getClass().getMethod("create", String.class).invoke(rf, regionName);
}
protected static void assertVersion(Object cache, short ordinal) throws Exception {
Class idmClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.distributed.internal.membership.InternalDistributedMember");
Method getDSMethod = cache.getClass().getMethod("getDistributedSystem");
getDSMethod.setAccessible(true);
Object ds = getDSMethod.invoke(cache);
Method getDistributedMemberMethod = ds.getClass().getMethod("getDistributedMember");
getDistributedMemberMethod.setAccessible(true);
Object member = getDistributedMemberMethod.invoke(ds);
Method getVersionObjectMethod = member.getClass().getMethod("getVersionObject");
getVersionObjectMethod.setAccessible(true);
Object thisVersion = getVersionObjectMethod.invoke(member);
Method getOrdinalMethod = thisVersion.getClass().getMethod("ordinal");
getOrdinalMethod.setAccessible(true);
short thisOrdinal = (Short) getOrdinalMethod.invoke(thisVersion);
if (ordinal != thisOrdinal) {
throw new Error(
"Version ordinal:" + thisOrdinal + " was not the expected ordinal of:" + ordinal);
}
}
protected static void stopCacheServers(Object cache) throws Exception {
Method getCacheServersMethod = cache.getClass().getMethod("getCacheServers");
getCacheServersMethod.setAccessible(true);
List cacheServers = (List) getCacheServersMethod.invoke(cache);
Method stopMethod = null;
for (Object cs : cacheServers) {
if (stopMethod == null) {
stopMethod = cs.getClass().getMethod("stop");
}
stopMethod.setAccessible(true);
stopMethod.invoke(cs);
}
}
protected static void closeCache(Object cache) throws Exception {
if (cache == null) {
return;
}
Method isClosedMethod = cache.getClass().getMethod("isClosed");
isClosedMethod.setAccessible(true);
boolean cacheClosed = (Boolean) isClosedMethod.invoke(cache);
if (cache != null && !cacheClosed) {
stopCacheServers(cache);
Method method = cache.getClass().getMethod("close");
method.setAccessible(true);
method.invoke(cache);
long startTime = System.currentTimeMillis();
while (!cacheClosed && System.currentTimeMillis() - startTime < 30000) {
try {
Thread.sleep(1000);
Method cacheClosedMethod = cache.getClass().getMethod("isClosed");
cacheClosedMethod.setAccessible(true);
cacheClosed = (Boolean) cacheClosedMethod.invoke(cache);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
protected static void rebalance(Object cache) throws Exception {
Method getRMMethod = cache.getClass().getMethod("getResourceManager");
getRMMethod.setAccessible(true);
Object manager = getRMMethod.invoke(cache);
Method createRebalanceFactoryMethod = manager.getClass().getMethod("createRebalanceFactory");
createRebalanceFactoryMethod.setAccessible(true);
Object rebalanceFactory = createRebalanceFactoryMethod.invoke(manager);
Method m = rebalanceFactory.getClass().getMethod("start");
m.setAccessible(true);
Object op = m.invoke(rebalanceFactory);
// Wait until the rebalance is complete
try {
Method getResultsMethod = op.getClass().getMethod("getResults");
getResultsMethod.setAccessible(true);
Object results = getResultsMethod.invoke(op);
Method getTotalTimeMethod = results.getClass().getMethod("getTotalTime");
getTotalTimeMethod.setAccessible(true);
System.out.println("Took " + getTotalTimeMethod.invoke(results) + " milliseconds\n");
Method getTotalBucketsMethod = results.getClass().getMethod("getTotalBucketTransferBytes");
getTotalBucketsMethod.setAccessible(true);
System.out.println("Transfered " + getTotalBucketsMethod.invoke(results) + "bytes\n");
} catch (Exception e) {
Thread.currentThread().interrupt();
throw e;
}
}
/**
* Starts a locator with given configuration.
*/
protected static void startLocator(final String serverHostName, final int port,
final String testName, final String locatorsString, final Properties props) throws Exception {
props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
props.setProperty(DistributionConfig.LOCATORS_NAME, locatorsString);
Logger logger = LogService.getLogger();
props.setProperty(DistributionConfig.LOG_LEVEL_NAME, logger.getLevel().name());
InetAddress bindAddr;
try {
bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
} catch (UnknownHostException uhe) {
throw new Error("While resolving bind address ", uhe);
}
File logFile = new File(testName + "-locator" + port + ".log");
Class locatorClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.distributed.Locator");
Method startLocatorAndDSMethod =
locatorClass.getMethod("startLocatorAndDS", int.class, File.class, InetAddress.class,
Properties.class, boolean.class, boolean.class, String.class);
startLocatorAndDSMethod.setAccessible(true);
startLocatorAndDSMethod.invoke(null, port, logFile, bindAddr, props, true, true, null);
}
protected static void startLocator(final String serverHostName, final int port, Properties props)
throws Exception {
InetAddress bindAddr = null;
try {
bindAddr = InetAddress.getByName(serverHostName);// getServerHostName(vm.getHost()));
} catch (UnknownHostException uhe) {
throw new Error("While resolving bind address ", uhe);
}
Locator.startLocatorAndDS(port, new File(""), bindAddr, props, true, true, null);
Thread.sleep(5000); // bug in 1.0 - cluster config service not immediately available
}
protected static void stopLocator() throws Exception {
Class internalLocatorClass = Thread.currentThread().getContextClassLoader()
.loadClass("org.apache.geode.distributed.internal.InternalLocator");
Method locatorMethod = internalLocatorClass.getMethod("getLocator");
locatorMethod.setAccessible(true);
Object locator = locatorMethod.invoke(null);
Method stopLocatorMethod = locator.getClass().getMethod("stop");
stopLocatorMethod.setAccessible(true);
stopLocatorMethod.invoke(locator);
}
/**
* Get the port that the standard dunit locator is listening on.
*
* @return locator address
*/
protected static String getDUnitLocatorAddress() {
return Host.getHost(0).getHostName();
}
protected Object executeDummyFunction(String regionName) {
Region region = ((InternalCache) cache).getRegion(regionName);
Object result = FunctionService.onRegion(region).execute("TestFunction").getResult();
ArrayList<Boolean> list = (ArrayList) result;
assertThat(list.get(0)).isTrue();
return result;
}
static class TestFunction implements Function, Declarable {
public void execute(FunctionContext context) {
context.getResultSender().lastResult(true);
}
public String getId() {
return getClass().getSimpleName();
}
public boolean optimizeForWrite() {
return true;
}
}
}