blob: 522f8bcdd0d138dda6456abac7b7648f377f6a4f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.ha;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.AttributesMutator;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.MirrorType;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.RegionEvent;
import com.gemstone.gemfire.cache.Scope;
import com.gemstone.gemfire.cache.client.PoolManager;
import com.gemstone.gemfire.cache.client.internal.PoolImpl;
import com.gemstone.gemfire.cache.util.BridgeServer;
import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
import com.gemstone.gemfire.cache30.BridgeTestCase;
import com.gemstone.gemfire.distributed.DistributedSystem;
import com.gemstone.gemfire.internal.AvailablePort;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
import com.gemstone.gemfire.cache.client.internal.Connection;
import com.gemstone.gemfire.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.VM;
/**
* This test verifies that eventId, while being sent across the network ( client
* to server, server to client and peer to peer) , goes as optimized byte-array.
* For client to server messages, the membership id part of event-id is not need
* to be sent with each event. Also, the threadId and sequenceId need not be
* sent as long if their value is small. This test has two servers and two
* clients , each connected to one server. The events with event-ids having
* specific values for thread-id and sequence-id are generated by client-1 and
* sent to server-1 and then to server-2 via p2p and then finally to client-2.
* It is verified that client-2 recieves the same values for thread-id and
* sequence-id.
*
* @author Dinesh Patel
*
*/
public class EventIdOptimizationDUnitTest extends DistributedTestCase
{
/** Cache-server1 */
VM server1 = null;
/** Cache-server2 */
VM server2 = null;
/** Client1 , connected to Cache-server1 */
VM client1 = null;
/** Client2 , connected to Cache-server2 */
VM client2 = null;
/** The long id (threadId or sequenceId) having value equivalent to byte */
private static final long ID_VALUE_BYTE = Byte.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to short */
private static final long ID_VALUE_SHORT = Short.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to int */
private static final long ID_VALUE_INT = Integer.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to long */
private static final long ID_VALUE_LONG = Long.MAX_VALUE;
/** Name of the test region */
private static final String REGION_NAME = "EventIdOptimizationDUnitTest_region";
/** The cache instance for test cases */
protected static Cache cache = null;
/**
* Connection proxy object to get connection for performing events that will
* have specific eventIds
*/
private static PoolImpl pool = null;
/** Boolean to indicate the client to proceed for validation */
private static volatile boolean proceedForValidation = false;
/** Boolean to propagate the failure in listener to the client */
private static volatile boolean validationFailed = false;
/** StringBuffer to hold the failure messages in client listener */
static StringBuffer failureMsg = new StringBuffer();
/** The last key for operations, to notify for proceeding to validation */
private static final String LAST_KEY = "LAST_KEY";
/**
* The eventID for the last key, used to identify the last event so that
* client can proceed for validation
*/
private static final EventID eventIdForLastKey = new EventID(new byte[] { 1,
2 }, 3, 4);
/**
* An array of eventIds having possible combinations of threadId and
* sequenceId values
*/
private static final EventID[] eventIds = new EventID[] {
new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_BYTE),
new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_SHORT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_INT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_LONG),
new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_BYTE),
new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_SHORT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_INT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_LONG),
new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_BYTE),
new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_SHORT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_INT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_LONG),
new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_BYTE),
new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_SHORT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_INT),
new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_LONG) };
/** Constructor */
public EventIdOptimizationDUnitTest(String name) {
super(name);
}
/**
* Sets up the cache-servers and clients for the test
*
* @throws Exception -
* thrown in any problem occurs in setUp
*/
public void setUp() throws Exception {
super.setUp();
disconnectAllFromDS();
final Host host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
client1 = host.getVM(2);
client2 = host.getVM(3);
int PORT1 = ((Integer)server1.invoke(EventIdOptimizationDUnitTest.class,
"createServerCache")).intValue();
int PORT2 = ((Integer)server2.invoke(EventIdOptimizationDUnitTest.class,
"createServerCache")).intValue();
client1.invoke(EventIdOptimizationDUnitTest.class, "createClientCache1",
new Object[] { getServerHostName(host), new Integer(PORT1) });
client2.invoke(EventIdOptimizationDUnitTest.class, "createClientCache2",
new Object[] { getServerHostName(host), new Integer(PORT2) });
}
/**
* Creates the cache
*
* @param props -
* distributed system props
* @throws Exception -
* thrown in any problem occurs in creating cache
*/
private void createCache(Properties props) throws Exception
{
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
/** Creates cache and starts the bridge-server */
public static Integer createServerCache() throws Exception
{
new EventIdOptimizationDUnitTest("temp").createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setMirrorType(MirrorType.KEYS_VALUES);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
// create multiple dummy regions to use them in destroyRegion case for
// testing eventIDs
for (int i = 0; i < eventIds.length; i++) {
cache.createRegion(REGION_NAME + i, attrs);
}
BridgeServer server = cache.addBridgeServer();
assertNotNull(server);
int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
/**
* Creates the client cache1, connected to server1
*
* @param port -
* bridgeserver port
* @throws Exception -
* thrown if any problem occurs in setting up the client
*/
public static void createClientCache1(String hostName, Integer port) throws Exception
{
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
new EventIdOptimizationDUnitTest("temp").createCache(props);
AttributesFactory factory = new AttributesFactory();
BridgeTestCase.configureConnectionPool(factory, hostName, port.intValue(),-1, true, -1, 2, null);
final BridgeServer bs1 = cache.addBridgeServer();
bs1.setPort(port.intValue());
pool = (PoolImpl)PoolManager.find("testPool");
}
/**
* Creates the client cache2, connected to server3
*
* @param port -
* bridgeserver port
* @throws Exception -
* thrown if any problem occurs in setting up the client
*/
public static void createClientCache2(String hostName, Integer port) throws Exception
{
Properties props = new Properties();
props.setProperty("mcast-port", "0");
props.setProperty("locators", "");
new EventIdOptimizationDUnitTest("temp").createCache(props);
AttributesFactory factory = new AttributesFactory();
BridgeTestCase.configureConnectionPool(factory, hostName, port.intValue(),-1, true, -1, 2, null);
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.addCacheListener(new CacheListenerAdapter() {
public void afterCreate(EntryEvent event)
{
String key = (String)event.getKey();
validateEventsAtReceivingClientListener(key);
}
public void afterDestroy(EntryEvent event)
{
String key = (String)event.getKey();
validateEventsAtReceivingClientListener(key);
}
public void afterRegionDestroy(RegionEvent event)
{
validateEventsAtReceivingClientListener(" <destroyRegion Event> ");
}
public void afterRegionClear(RegionEvent event)
{
validateEventsAtReceivingClientListener(" <clearRegion Event> ");
}
});
RegionAttributes attrs = factory.create();
Region region = cache.createRegion(REGION_NAME, attrs);
region.registerInterest("ALL_KEYS");
for (int i = 0; i < eventIds.length; i++) {
region = cache.createRegion(REGION_NAME + i, attrs);
region.registerInterest("ALL_KEYS");
}
pool = (PoolImpl)PoolManager.find("testPool");
}
/**
* Generates events having specific values of threadId and sequenceId, via put
* operation through connection object
*
* @throws Exception -
* thrown if any problem occurs in put operation
*/
public static void generateEventsByPutOperation() throws Exception
{
Connection connection = pool.acquireConnection();
String regionName = Region.SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
srp.putOnForTestsOnly(connection, "KEY-" + i, "VAL-" + i, eventIds[i], null);
}
srp.putOnForTestsOnly(connection, LAST_KEY, "LAST_VAL", eventIdForLastKey, null);
}
/**
* Generates events having specific values of threadId and sequenceId, via
* destroyEntry operation through connection object
*
* @throws Exception -
* thrown if any problem occurs in destroyEntry operation
*/
public static void generateEventsByDestroyEntryOperation() throws Exception
{
Connection connection = pool.acquireConnection();
String regionName = Region.SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
srp.destroyOnForTestsOnly(connection, "KEY-" + i, null, Operation.DESTROY, new EntryEventImpl(eventIds[i]), null);
}
srp.destroyOnForTestsOnly(connection, LAST_KEY, null, Operation.DESTROY, new EntryEventImpl(eventIdForLastKey), null);
}
/**
* Generates events having specific values of threadId and sequenceId, via
* destroyRegionOperation through connection object
*
* @throws Exception -
* thrown if any problem occurs in destroyRegionOperation
*/
public static void generateEventsByDestroyRegionOperation() throws Exception
{
Connection connection = pool.acquireConnection();
String regionName = Region.SEPARATOR + REGION_NAME;
for (int i = 0; i < 1; i++) {
ServerRegionProxy srp = new ServerRegionProxy(regionName+i, pool);
srp.destroyRegionOnForTestsOnly(connection, eventIds[i], null);
}
{
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
srp.destroyRegionOnForTestsOnly(connection, eventIdForLastKey, null);
}
}
/**
* Generates events having specific values of threadId and sequenceId, via
* clearRegionOperation through connection object
*
* @throws Exception -
* thrown if any problem occurs in clearRegionOperation
*/
public static void generateEventsByClearRegionOperation() throws Exception
{
Connection connection = pool.acquireConnection();
String regionName = Region.SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
srp.clearOnForTestsOnly(connection, eventIds[i], null);
}
srp.clearOnForTestsOnly(connection, eventIdForLastKey, null);
}
/**
* Generates events having specific values of threadId and sequenceId from
* client1 via put operation and verifies that the values received on client2
* match with those sent from client1.
*
* @throws Exception -
* thrown if any exception occurs in test
*/
public void testEventIdOptimizationByPutOperation() throws Exception
{
client1.invoke(EventIdOptimizationDUnitTest.class,
"generateEventsByPutOperation");
client2.invoke(EventIdOptimizationDUnitTest.class,
"verifyEventIdsOnClient2");
}
/**
* Generates events having specific values of threadId and sequenceId from
* client1 via destroyEntry operation and verifies that the values received on
* client2 match with those sent from client1.
*
* @throws Exception -
* thrown if any exception occurs in test
*/
public void testEventIdOptimizationByDestroyEntryOperation() throws Exception
{
client1.invoke(EventIdOptimizationDUnitTest.class,
"generateEventsByDestroyEntryOperation");
client2.invoke(EventIdOptimizationDUnitTest.class,
"verifyEventIdsOnClient2");
}
/**
* Generates events having specific values of threadId and sequenceId from
* client1 via destroyRegion operation and verifies that the values received
* on client2 match with those sent from client1.
*
* @throws Exception -
* thrown if any exception occurs in test
*/
public void testEventIdOptimizationByDestroyRegionOperation()
throws Exception
{
client1.invoke(EventIdOptimizationDUnitTest.class,
"generateEventsByDestroyRegionOperation");
client2.invoke(EventIdOptimizationDUnitTest.class,
"verifyEventIdsOnClient2");
}
/**
* Generates events having specific values of threadId and sequenceId from
* client1 via clearRegion operation and verifies that the values received on
* client2 match with those sent from client1.
*
* @throws Exception -
* thrown if any exception occurs in test
*/
public void testEventIdOptimizationByClearRegionOperation() throws Exception
{
client1.invoke(EventIdOptimizationDUnitTest.class,
"generateEventsByClearRegionOperation");
client2.invoke(EventIdOptimizationDUnitTest.class,
"verifyEventIdsOnClient2");
}
/**
* Waits for the listener to receive all events and validates that no
* exception occured in client
*/
public static void verifyEventIdsOnClient2()
{
if (!proceedForValidation) {
synchronized (EventIdOptimizationDUnitTest.class) {
if (!proceedForValidation)
try {
getLogWriter().info(
"Client2 going in wait before starting validation");
EventIdOptimizationDUnitTest.class.wait();
}
catch (InterruptedException e) {
fail("interrupted");
}
}
}
getLogWriter().info("Starting validation on client2");
if (validationFailed) {
fail("\n The following eventIds recieved by client2 were not present in the eventId array sent by client1 \n"
+ failureMsg);
}
getLogWriter().info("Validation complete on client2, goin to unregister listeners");
Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME);
if (region != null && !region.isDestroyed()) {
try {
AttributesMutator mutator = region.getAttributesMutator();
mutator.initCacheListeners(null);
}
catch (RegionDestroyedException ignore) {
}
}
for (int i = 0; i < eventIds.length; i++) {
region = cache.getRegion(Region.SEPARATOR + REGION_NAME + i);
if (region != null && !region.isDestroyed()) {
try {
AttributesMutator mutator = region.getAttributesMutator();
mutator.initCacheListeners(null);
}
catch (RegionDestroyedException ignore) {
}
}
}
getLogWriter().info("Test completed, Unregistered the listeners");
}
/**
* Closes the cache
*
*/
public static void closeCache()
{
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
/**
* Closes the caches on clients and servers
*/
public void tearDown2() throws Exception
{
// close client
client1.invoke(EventIdOptimizationDUnitTest.class, "closeCache");
client2.invoke(EventIdOptimizationDUnitTest.class, "closeCache");
// close server
server1.invoke(EventIdOptimizationDUnitTest.class, "closeCache");
server2.invoke(EventIdOptimizationDUnitTest.class, "closeCache");
}
/**
* Function to assert that the ThreadIdtoSequence id Map is not Null and has
* only one entry.
*
* @return - eventID object from the ThreadIdToSequenceIdMap
*/
public static Object assertThreadIdToSequenceIdMapHasEntryId()
{
Map map = pool.getThreadIdToSequenceIdMap();
assertNotNull(map);
// The map size can now be 1 or 2 because of the server thread putting
// the marker in the queue. If it is 2, the first entry is the server
// thread; the second is the client thread. If it is 1, the entry is the
// client thread. The size changes because of the map.clear call below.
assertTrue(map.size() != 0);
// Set the entry to the last entry
Map.Entry entry = null;
for (Iterator threadIdToSequenceIdMapIterator = map.entrySet().iterator(); threadIdToSequenceIdMapIterator.hasNext();) {
entry = (Map.Entry)threadIdToSequenceIdMapIterator.next();
}
ThreadIdentifier tid = (ThreadIdentifier)entry.getKey();
SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject)entry
.getValue();
long sequenceId = seo.getSequenceId();
EventID evId = new EventID(tid.getMembershipID(), tid.getThreadID(),
sequenceId);
synchronized(map) {
map.clear();
}
return evId;
}
/**
* Validates that the eventId of the event received in callback is contained
* in the eventId array originally used by client1 to generate the events and
* notifies client2 to proceed for validation once the LAST_KEY is received
*
* @param key -
* the key of the event for EntryEvent / token indicating type of
* region operation for RegionEvent
*/
public static void validateEventsAtReceivingClientListener(String key)
{
EventID eventIdAtClient2 = (EventID)assertThreadIdToSequenceIdMapHasEntryId();
if ((eventIdAtClient2.getThreadID() == eventIdForLastKey.getThreadID())
&& (eventIdAtClient2.getSequenceID() == eventIdForLastKey
.getSequenceID())) {
synchronized (EventIdOptimizationDUnitTest.class) {
getLogWriter().info("Notifying client2 to proceed for validation");
proceedForValidation = true;
EventIdOptimizationDUnitTest.class.notify();
}
}
else {
boolean containsEventId = false;
for (int i = 0; i < eventIds.length; i++) {
if ((eventIdAtClient2.getThreadID() == eventIds[i].getThreadID())
&& (eventIdAtClient2.getSequenceID() == eventIds[i].getSequenceID())) {
containsEventId = true;
break;
}
}
if (!containsEventId) {
validationFailed = true;
failureMsg.append("key = ").append(key).append(" ; eventID = ").append(
eventIdAtClient2).append(System.getProperty("line.separator"));
}
}
}
}