blob: ee486645ac9b649104177c779c25d69370a6476a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.ha;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.stream.IntStream;
import org.junit.Test;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.client.ClientRegionShortcut;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
import org.apache.geode.test.dunit.DUnitEnv;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
public class PreferSerializedHARegionQueueTest extends JUnit4CacheTestCase {
private static final long serialVersionUID = 1L;
@Test
public void copyingHARegionQueueShouldNotThrowException() throws Exception {
Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
VM vm2 = host.getVM(2);
VM vm3 = host.getVM(3);
VM vm4 = host.getVM(4);
VM vm5 = host.getVM(5);
VM vm6 = host.getVM(6);
// Set prefer serialized
vm1.invoke(() -> setPreferSerialized());
vm2.invoke(() -> setPreferSerialized());
vm3.invoke(() -> setPreferSerialized());
vm4.invoke(() -> setPreferSerialized());
String regionName = getTestMethodName() + "_PR";
try {
// Initialize initial cache servers
vm1.invoke(() -> initializeServer(regionName));
vm2.invoke(() -> initializeServer(regionName));
// Create register interest client
vm5.invoke(() -> createClient(regionName, true, 1, Integer.MAX_VALUE));
// Wait for both primary and secondary servers to establish proxies
vm1.invoke(() -> waitForCacheClientProxies(1));
vm2.invoke(() -> waitForCacheClientProxies(1));
// Create client loader and load entries
int numPuts = 10;
vm6.invoke(
() -> createClient(regionName, false, 0, PoolFactory.DEFAULT_SUBSCRIPTION_ACK_INTERVAL));
vm6.invoke(() -> {
Region region = getCache().getRegion(regionName);
IntStream.range(0, numPuts).forEach(i -> region.put(i, i));
});
// Verify HARegion sizes
vm1.invoke(() -> waitForHARegionSize(numPuts));
vm2.invoke(() -> waitForHARegionSize(numPuts));
// Initialize next cache server
vm3.invoke(() -> initializeServer(regionName));
// Stop one of the original cache servers
vm1.invoke(() -> closeCache());
// Wait for new cache server to establish proxies
vm3.invoke(() -> waitForCacheClientProxies(1));
// Verify HARegion size
vm3.invoke(() -> waitForHARegionSize(numPuts));
// Initialize final cache server
vm4.invoke(() -> initializeServer(regionName));
// Stop other original cache server
vm2.invoke(() -> closeCache());
// Wait for new cache server to establish proxies
vm4.invoke(() -> waitForCacheClientProxies(1));
// Verify HARegion size
vm4.invoke(() -> waitForHARegionSize(numPuts));
// Stop the clients to prevent suspect strings when the servers are stopped
vm5.invoke(() -> closeCache());
vm6.invoke(() -> closeCache());
} finally {
// Clear prefer serialized
vm1.invoke(() -> clearPreferSerialized());
vm2.invoke(() -> clearPreferSerialized());
vm3.invoke(() -> clearPreferSerialized());
vm4.invoke(() -> clearPreferSerialized());
}
}
public void initializeServer(String regionName) throws IOException {
getCache().createRegionFactory(RegionShortcut.PARTITION).create(regionName);
final CacheServer cacheServer = getCache().addCacheServer();
cacheServer.setPort(0);
cacheServer.start();
}
public void createClient(String regionName, boolean subscriptionEnabled,
int subscriptionRedundancy, int subscriptionAckInterval) {
ClientCacheFactory clientCacheFactory =
new ClientCacheFactory().setPoolSubscriptionAckInterval(subscriptionAckInterval)
.setPoolSubscriptionEnabled(subscriptionEnabled)
.setPoolSubscriptionRedundancy(subscriptionRedundancy)
.addPoolLocator("localhost", DUnitEnv.get().getLocatorPort());
ClientCache cache = getClientCache(clientCacheFactory);
Region region = cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
if (subscriptionEnabled) {
region.registerInterest("ALL_KEYS");
}
}
public static void setPreferSerialized() {
System.setProperty("gemfire.PREFER_SERIALIZED", "true");
}
public static void clearPreferSerialized() {
System.clearProperty("gemfire.PREFER_SERIALIZED");
}
public void waitForCacheClientProxies(final int expectedSize) {
final CacheServer cs = getCache().getCacheServers().iterator().next();
await()
.untilAsserted(() -> assertEquals(expectedSize, cs.getAllClientSessions().size()));
}
public void waitForHARegionSize(final int expectedSize) {
final CacheServer cs = getCache().getCacheServers().iterator().next();
final CacheClientProxy ccp = (CacheClientProxy) cs.getAllClientSessions().iterator().next();
await()
.untilAsserted(() -> assertEquals(expectedSize, getHAEventsCount(ccp)));
}
private static int getHAEventsCount(CacheClientProxy ccp) {
Region haRegion = ccp.getHARegion();
if (haRegion == null) {
return 0;
}
int count = 0;
for (Object value : haRegion.values()) {
if (value instanceof HAEventWrapper) {
count += 1;
}
}
return count;
}
}