blob: af5f92b5c0ef5067cfa12e4e271f1ddfd2290340 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.InterestResultPolicy.KEYS;
import static org.apache.geode.cache.Region.Entry;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.cache.client.internal.RegisterInterestTracker.interestListIndex;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.internal.cache.tier.InterestType.KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.MirrorType;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.ClientServerObserverAdapter;
import org.apache.geode.internal.cache.ClientServerObserverHolder;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* Tests Interest Registration Functionality
*/
@Category({ClientSubscriptionTest.class})
@SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"})
public class HAInterestTestCase extends JUnit4DistributedTestCase {
protected static final int TIMEOUT_MILLIS = 60 * 1000;
protected static final int INTERVAL_MILLIS = 10;
protected static final String REGION_NAME = "HAInterestBaseTest_region";
protected static final String k1 = "k1";
protected static final String k2 = "k2";
protected static final String client_k1 = "client-k1";
protected static final String client_k2 = "client-k2";
protected static final String server_k1 = "server-k1";
protected static final String server_k2 = "server-k2";
protected static final String server_k1_updated = "server_k1_updated";
protected static Cache cache = null;
protected static PoolImpl pool = null;
protected static Connection conn = null;
protected static int PORT1;
protected static int PORT2;
protected static int PORT3;
protected static boolean isBeforeRegistrationCallbackCalled = false;
protected static boolean isBeforeInterestRecoveryCallbackCalled = false;
protected static boolean isAfterRegistrationCallbackCalled = false;
protected static Host host = null;
protected static VM server1 = null;
protected static VM server2 = null;
protected static VM server3 = null;
protected static volatile boolean exceptionOccurred = false;
@Override
public final void postSetUp() throws Exception {
host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
server3 = host.getVM(2);
CacheServerTestUtil.disableShufflingOfEndpoints();
// start servers first
PORT1 = ((Integer) server1.invoke(() -> HAInterestTestCase.createServerCache())).intValue();
PORT2 = ((Integer) server2.invoke(() -> HAInterestTestCase.createServerCache())).intValue();
PORT3 = ((Integer) server3.invoke(() -> HAInterestTestCase.createServerCache())).intValue();
exceptionOccurred = false;
IgnoredException.addIgnoredException("java.net.ConnectException: Connection refused: connect");
}
@Override
public final void preTearDown() throws Exception {
// close the clients first
closeCache();
// then close the servers
server1.invoke(() -> HAInterestTestCase.closeCache());
server2.invoke(() -> HAInterestTestCase.closeCache());
server3.invoke(() -> HAInterestTestCase.closeCache());
CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
}
public static void closeCache() {
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = false;
HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
cache = null;
pool = null;
conn = null;
}
/**
* Return the current primary waiting for a primary to exist.
*
* @since GemFire 5.7
*/
public static VM getPrimaryVM() {
return getPrimaryVM(null);
}
/**
* Return the current primary waiting for a primary to exist and for it not to be the oldPrimary
* (if oldPrimary is NOT null).
*
* @since GemFire 5.7
*/
public static VM getPrimaryVM(final VM oldPrimary) {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
int primaryPort = pool.getPrimaryPort();
if (primaryPort == -1) {
return false;
}
// we have a primary
VM currentPrimary = getServerVM(primaryPort);
if (currentPrimary != oldPrimary) {
return true;
}
return false;
}
@Override
public String description() {
return "waiting for primary";
}
};
GeodeAwaitility.await().untilAsserted(wc);
int primaryPort = pool.getPrimaryPort();
assertTrue(primaryPort != -1);
VM currentPrimary = getServerVM(primaryPort);
assertTrue(currentPrimary != oldPrimary);
return currentPrimary;
}
public static VM getBackupVM() {
return getBackupVM(null);
}
public static VM getBackupVM(VM stoppedBackup) {
VM currentPrimary = getPrimaryVM(null);
if (currentPrimary != server2 && server2 != stoppedBackup) {
return server2;
} else if (currentPrimary != server3 && server3 != stoppedBackup) {
return server3;
} else if (currentPrimary != server1 && server1 != stoppedBackup) {
return server1;
} else {
fail("expected currentPrimary " + currentPrimary + " to be " + server1 + ", or " + server2
+ ", or " + server3);
return null;
}
}
/**
* Given a server vm (server1, server2, or server3) return its port.
*
* @since GemFire 5.7
*/
public static int getServerPort(VM vm) {
if (vm == server1) {
return PORT1;
} else if (vm == server2) {
return PORT2;
} else if (vm == server3) {
return PORT3;
} else {
fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", or " + server3);
return -1;
}
}
/**
* Given a server port (PORT1, PORT2, or PORT3) return its vm.
*
* @since GemFire 5.7
*/
public static VM getServerVM(int port) {
if (port == PORT1) {
return server1;
} else if (port == PORT2) {
return server2;
} else if (port == PORT3) {
return server3;
} else {
fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", or " + PORT3);
return null;
}
}
public static void verifyRefreshedEntriesFromServer() {
final Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
Entry re = r1.getEntry(k1);
if (re == null)
return false;
Object val = re.getValue();
return client_k1.equals(val);
}
@Override
public String description() {
return "waiting for client_k1 refresh from server";
}
};
GeodeAwaitility.await().untilAsserted(wc);
wc = new WaitCriterion() {
@Override
public boolean done() {
Entry re = r1.getEntry(k2);
if (re == null)
return false;
Object val = re.getValue();
return client_k2.equals(val);
}
@Override
public String description() {
return "waiting for client_k2 refresh from server";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
public static void verifyDeadAndLiveServers(final int expectedDeadServers,
final int expectedLiveServers) {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == expectedLiveServers;
}
@Override
public String description() {
return "waiting for pool.getConnectedServerCount() == expectedLiveServer";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
public static void putK1andK2() {
Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
r1.put(k1, server_k1);
r1.put(k2, server_k2);
}
public static void setClientServerObserverForBeforeInterestRecoveryFailure() {
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
@Override
public void beforeInterestRecovery() {
synchronized (HAInterestTestCase.class) {
Thread t = new Thread() {
@Override
public void run() {
getBackupVM().invoke(() -> HAInterestTestCase.startServer());
getPrimaryVM().invoke(() -> HAInterestTestCase.stopServer());
}
};
t.start();
try {
ThreadUtils.join(t, 30 * 1000);
} catch (Exception ignore) {
exceptionOccurred = true;
}
HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
HAInterestTestCase.class.notify();
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
}
}
});
}
public static void setClientServerObserverForBeforeInterestRecovery() {
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
@Override
public void beforeInterestRecovery() {
synchronized (HAInterestTestCase.class) {
Thread t = new Thread() {
@Override
public void run() {
Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
r1.put(k1, server_k1_updated);
}
};
t.start();
HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
HAInterestTestCase.class.notify();
PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForBeforeInterestRecoveryCallBack() throws InterruptedException {
assertNotNull(cache);
synchronized (HAInterestTestCase.class) {
while (!isBeforeInterestRecoveryCallbackCalled) {
HAInterestTestCase.class.wait();
}
}
}
public static void setClientServerObserverForBeforeRegistration(final VM vm) {
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
@Override
public void beforeInterestRegistration() {
synchronized (HAInterestTestCase.class) {
vm.invoke(() -> HAInterestTestCase.startServer());
HAInterestTestCase.isBeforeRegistrationCallbackCalled = true;
HAInterestTestCase.class.notify();
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForBeforeRegistrationCallback() throws InterruptedException {
assertNotNull(cache);
synchronized (HAInterestTestCase.class) {
while (!isBeforeRegistrationCallbackCalled) {
HAInterestTestCase.class.wait();
}
}
}
public static void setClientServerObserverForAfterRegistration(final VM vm) {
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true;
ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
@Override
public void afterInterestRegistration() {
synchronized (HAInterestTestCase.class) {
vm.invoke(() -> HAInterestTestCase.startServer());
HAInterestTestCase.isAfterRegistrationCallbackCalled = true;
HAInterestTestCase.class.notify();
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
}
}
});
}
public static void waitForAfterRegistrationCallback() throws InterruptedException {
assertNotNull(cache);
if (!isAfterRegistrationCallbackCalled) {
synchronized (HAInterestTestCase.class) {
while (!isAfterRegistrationCallbackCalled) {
HAInterestTestCase.class.wait();
}
}
}
}
public static void unSetClientServerObserverForRegistrationCallback() {
synchronized (HAInterestTestCase.class) {
PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
}
}
public static void verifyDispatcherIsAlive() {
assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return cache.getCacheServers().size() == 1;
}
@Override
public String description() {
return "waiting for cache.getCacheServers().size() == 1";
}
};
GeodeAwaitility.await().untilAsserted(wc);
CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
@Override
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
@Override
public String description() {
return "waiting for ccn.getClientProxies().size() > 0";
}
};
GeodeAwaitility.await().untilAsserted(wc);
wc = new WaitCriterion() {
Iterator iter_prox;
CacheClientProxy proxy;
@Override
public boolean done() {
iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
proxy = (CacheClientProxy) iter_prox.next();
return proxy._messageDispatcher.isAlive();
} else {
return false;
}
}
@Override
public String description() {
return "waiting for CacheClientProxy _messageDispatcher to be alive";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
public static void verifyDispatcherIsNotAlive() {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return cache.getCacheServers().size() == 1;
}
@Override
public String description() {
return "cache.getCacheServers().size() == 1";
}
};
GeodeAwaitility.await().untilAsserted(wc);
CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
@Override
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
@Override
public String description() {
return "waiting for ccn.getClientProxies().size() > 0";
}
};
GeodeAwaitility.await().untilAsserted(wc);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
assertFalse("Dispatcher on secondary should not be alive",
proxy._messageDispatcher.isAlive());
}
}
public static void createEntriesK1andK2OnServer() {
Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, server_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, server_k2);
}
assertEquals(r1.getEntry(k1).getValue(), server_k1);
assertEquals(r1.getEntry(k2).getValue(), server_k2);
}
public static void createEntriesK1andK2() {
Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, client_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, client_k2);
}
assertEquals(r1.getEntry(k1).getValue(), client_k1);
assertEquals(r1.getEntry(k2).getValue(), client_k2);
}
public static void createServerEntriesK1andK2() {
Region r1 = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r1);
if (!r1.containsKey(k1)) {
r1.create(k1, server_k1);
}
if (!r1.containsKey(k2)) {
r1.create(k2, server_k2);
}
assertEquals(r1.getEntry(k1).getValue(), server_k1);
assertEquals(r1.getEntry(k2).getValue(), server_k2);
}
public static void registerK1AndK2() {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
r.registerInterest(list, InterestResultPolicy.KEYS_VALUES);
}
public static void reRegisterK1AndK2() {
Region r = cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
r.registerInterest(list);
}
public static void startServer() throws IOException {
Cache c = CacheFactory.getAnyInstance();
assertEquals("More than one BridgeServer", 1, c.getCacheServers().size());
CacheServerImpl bs = (CacheServerImpl) c.getCacheServers().iterator().next();
assertNotNull(bs);
bs.start();
}
public static void stopServer() {
assertEquals("More than one BridgeServer", 1, cache.getCacheServers().size());
CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
assertNotNull(bs);
bs.stop();
}
public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() {
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
@Override
public String description() {
return "connected server count never became 3";
}
};
GeodeAwaitility.await().untilAsserted(wc);
// close primaryEP
getPrimaryVM().invoke(() -> stopServer());
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, KEY, KEYS, false,
r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys = (List) serverKeys.get(0);
assertEquals(2, resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
}
public static void stopPrimaryAndUnregisterRegisterK1() {
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
@Override
public String description() {
return "connected server count never became 3";
}
};
GeodeAwaitility.await().untilAsserted(wc);
// close primaryEP
getPrimaryVM().invoke(() -> stopServer());
List list = new ArrayList();
list.add(k1);
srp.unregisterInterest(list, KEY, false, false);
}
public static void stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() {
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
@Override
public String description() {
return "connected server count never became 3";
}
};
GeodeAwaitility.await().untilAsserted(wc);
// close primaryEP
VM backup = getBackupVM();
getPrimaryVM().invoke(() -> stopServer());
// close secondary
backup.invoke(() -> stopServer());
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, KEY, KEYS, false,
r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys = (List) serverKeys.get(0);
assertEquals(2, resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
}
/**
* returns the secondary that was stopped
*/
public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() {
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
@Override
public String description() {
return "Never got three connected servers";
}
};
GeodeAwaitility.await().untilAsserted(wc);
// close secondary EP
VM result = getBackupVM();
result.invoke(() -> stopServer());
List list = new ArrayList();
list.add(k1);
list.add(k2);
List serverKeys = srp.registerInterest(list, KEY, KEYS, false,
r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys);
List resultKeys = (List) serverKeys.get(0);
assertEquals(2, resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
return result;
}
/**
* returns the secondary that was stopped
*/
public static VM stopSecondaryAndUNregisterK1() {
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return pool.getConnectedServerCount() == 3;
}
@Override
public String description() {
return "connected server count never became 3";
}
};
GeodeAwaitility.await().untilAsserted(wc);
// close secondary EP
VM result = getBackupVM();
result.invoke(() -> stopServer());
List list = new ArrayList();
list.add(k1);
srp.unregisterInterest(list, KEY, false, false);
return result;
}
public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() {
ServerLocation primary = pool.getPrimary();
ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0);
LocalRegion r = (LocalRegion) cache.getRegion(SEPARATOR + REGION_NAME);
assertNotNull(r);
ServerRegionProxy srp = new ServerRegionProxy(r);
List list = new ArrayList();
list.add(k1);
list.add(k2);
// Primary server
List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY,
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
assertNotNull(serverKeys1);
// expect serverKeys in response from primary
List resultKeys = (List) serverKeys1.get(0);
assertEquals(2, resultKeys.size());
assertTrue(resultKeys.contains(k1));
assertTrue(resultKeys.contains(k2));
// Secondary server
List serverKeys2 = srp.registerInterestOn(secondary, list, InterestType.KEY,
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
// if the list is null then it is empty
if (serverKeys2 != null) {
// no serverKeys in response from secondary
assertTrue(serverKeys2.isEmpty());
}
}
public static void verifyInterestRegistration() {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return cache.getCacheServers().size() == 1;
}
@Override
public String description() {
return "waiting for cache.getCacheServers().size() == 1";
}
};
GeodeAwaitility.await().untilAsserted(wc);
CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
@Override
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
@Override
public String description() {
return "waiting for ccn.getClientProxies().size() > 0";
}
};
GeodeAwaitility.await().untilAsserted(wc);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
wc = new WaitCriterion() {
@Override
public boolean done() {
Set keysMap = (Set) ccp.cils[interestListIndex]
.getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null && keysMap.size() == 2;
}
@Override
public String description() {
return "waiting for keys of interest to include 2 keys";
}
};
GeodeAwaitility.await().untilAsserted(wc);
Set keysMap = (Set) ccp.cils[interestListIndex]
.getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(2, keysMap.size());
assertTrue(keysMap.contains(k1));
assertTrue(keysMap.contains(k2));
}
}
public static void verifyInterestUNRegistration() {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return cache.getCacheServers().size() == 1;
}
@Override
public String description() {
return "waiting for cache.getCacheServers().size() == 1";
}
};
GeodeAwaitility.await().untilAsserted(wc);
CacheServerImpl bs = (CacheServerImpl) cache.getCacheServers().iterator().next();
assertNotNull(bs);
assertNotNull(bs.getAcceptor());
assertNotNull(bs.getAcceptor().getCacheClientNotifier());
final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
wc = new WaitCriterion() {
@Override
public boolean done() {
return ccn.getClientProxies().size() > 0;
}
@Override
public String description() {
return "waiting for ccn.getClientProxies().size() > 0";
}
};
GeodeAwaitility.await().untilAsserted(wc);
Iterator iter_prox = ccn.getClientProxies().iterator();
if (iter_prox.hasNext()) {
final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
wc = new WaitCriterion() {
@Override
public boolean done() {
Set keysMap = (Set) ccp.cils[interestListIndex]
.getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
return keysMap != null;
}
@Override
public String description() {
return "waiting for keys of interest to not be null";
}
};
GeodeAwaitility.await().untilAsserted(wc);
Set keysMap = (Set) ccp.cils[interestListIndex]
.getProfile(SEPARATOR + REGION_NAME).getKeysOfInterestFor(ccp.getProxyID());
assertNotNull(keysMap);
assertEquals(1, keysMap.size());
assertFalse(keysMap.contains(k1));
assertTrue(keysMap.contains(k2));
}
}
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
assertNotNull(ds);
ds.disconnect();
ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
public static void createClientPoolCache(String testName, String host) throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new HAInterestTestCase().createCache(props);
CacheServerTestUtil.disableShufflingOfEndpoints();
PoolImpl p;
try {
p = (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).addServer(host, PORT2)
.addServer(host, PORT3).setSubscriptionEnabled(true).setSubscriptionRedundancy(-1)
.setReadTimeout(10000).setPingInterval(1000)
// retryInterval should be more so that only registerInterste thread
// will initiate failover
// .setRetryInterval(20000)
.create("HAInterestBaseTestPool");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static void createClientPoolCacheWithSmallRetryInterval(String testName, String host)
throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new HAInterestTestCase().createCache(props);
CacheServerTestUtil.disableShufflingOfEndpoints();
PoolImpl p;
try {
p = (PoolImpl) PoolManager.createFactory().addServer(host, PORT1).addServer(host, PORT2)
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(10000)
.setSocketBufferSize(32768).setMinConnections(6).setPingInterval(200)
// .setRetryInterval(200)
// retryAttempts 3
.create("HAInterestBaseTestPool");
} finally {
CacheServerTestUtil.enableShufflingOfEndpoints();
}
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static void createClientPoolCacheConnectionToSingleServer(String testName, String hostName)
throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new HAInterestTestCase().createCache(props);
PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(hostName, PORT1)
.setSubscriptionEnabled(true).setSubscriptionRedundancy(-1).setReadTimeout(10000)
// .setRetryInterval(20)
.create("HAInterestBaseTestPool");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
factory.setPoolName(p.getName());
cache.createRegion(REGION_NAME, factory.create());
pool = p;
conn = pool.acquireConnection();
assertNotNull(conn);
}
public static Integer createServerCache() throws Exception {
new HAInterestTestCase().createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEnableBridgeConflation(true);
factory.setMirrorType(MirrorType.KEYS_VALUES);
factory.setConcurrencyChecksEnabled(true);
cache.createRegion(REGION_NAME, factory.create());
CacheServer server = cache.addCacheServer();
int port = getRandomAvailableTCPPort();
server.setPort(port);
server.setMaximumTimeBetweenPings(180000);
// ensures updates to be sent instead of invalidations
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
public static Integer createServerCacheWithLocalRegion() throws Exception {
new HAInterestTestCase().createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.LOCAL);
factory.setConcurrencyChecksEnabled(true);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
CacheServer server = cache.addCacheServer();
int port = getRandomAvailableTCPPort();
server.setPort(port);
// ensures updates to be sent instead of invalidations
server.setNotifyBySubscription(true);
server.setMaximumTimeBetweenPings(180000);
server.start();
return new Integer(server.getPort());
}
}