| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| package com.gemstone.gemfire.internal.cache.ha; |
| |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Properties; |
| |
| import com.gemstone.gemfire.cache.AttributesFactory; |
| import com.gemstone.gemfire.cache.AttributesMutator; |
| import com.gemstone.gemfire.cache.Cache; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.MirrorType; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.Region; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RegionEvent; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.client.PoolManager; |
| import com.gemstone.gemfire.cache.client.internal.PoolImpl; |
| import com.gemstone.gemfire.cache.util.BridgeServer; |
| import com.gemstone.gemfire.cache.util.CacheListenerAdapter; |
| import com.gemstone.gemfire.cache30.BridgeTestCase; |
| import com.gemstone.gemfire.distributed.DistributedSystem; |
| import com.gemstone.gemfire.internal.AvailablePort; |
| import com.gemstone.gemfire.internal.cache.EntryEventImpl; |
| import com.gemstone.gemfire.internal.cache.EventID; |
| import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy; |
| import com.gemstone.gemfire.cache.client.internal.Connection; |
| import com.gemstone.gemfire.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject; |
| |
| import dunit.DistributedTestCase; |
| import dunit.Host; |
| import dunit.VM; |
| |
| /** |
| * This test verifies that eventId, while being sent across the network ( client |
| * to server, server to client and peer to peer) , goes as optimized byte-array. |
| * For client to server messages, the membership id part of event-id is not need |
| * to be sent with each event. Also, the threadId and sequenceId need not be |
| * sent as long if their value is small. This test has two servers and two |
| * clients , each connected to one server. The events with event-ids having |
| * specific values for thread-id and sequence-id are generated by client-1 and |
| * sent to server-1 and then to server-2 via p2p and then finally to client-2. |
| * It is verified that client-2 recieves the same values for thread-id and |
| * sequence-id. |
| * |
| * @author Dinesh Patel |
| * |
| */ |
| public class EventIdOptimizationDUnitTest extends DistributedTestCase |
| { |
| |
| /** Cache-server1 */ |
| VM server1 = null; |
| |
| /** Cache-server2 */ |
| VM server2 = null; |
| |
| /** Client1 , connected to Cache-server1 */ |
| VM client1 = null; |
| |
| /** Client2 , connected to Cache-server2 */ |
| VM client2 = null; |
| |
| /** The long id (threadId or sequenceId) having value equivalent to byte */ |
| private static final long ID_VALUE_BYTE = Byte.MAX_VALUE; |
| |
| /** The long id (threadId or sequenceId) having value equivalent to short */ |
| private static final long ID_VALUE_SHORT = Short.MAX_VALUE; |
| |
| /** The long id (threadId or sequenceId) having value equivalent to int */ |
| private static final long ID_VALUE_INT = Integer.MAX_VALUE; |
| |
| /** The long id (threadId or sequenceId) having value equivalent to long */ |
| private static final long ID_VALUE_LONG = Long.MAX_VALUE; |
| |
| /** Name of the test region */ |
| private static final String REGION_NAME = "EventIdOptimizationDUnitTest_region"; |
| |
| /** The cache instance for test cases */ |
| protected static Cache cache = null; |
| |
| /** |
| * Connection proxy object to get connection for performing events that will |
| * have specific eventIds |
| */ |
| private static PoolImpl pool = null; |
| |
| /** Boolean to indicate the client to proceed for validation */ |
| private static volatile boolean proceedForValidation = false; |
| |
| /** Boolean to propagate the failure in listener to the client */ |
| private static volatile boolean validationFailed = false; |
| |
| /** StringBuffer to hold the failure messages in client listener */ |
| static StringBuffer failureMsg = new StringBuffer(); |
| |
| /** The last key for operations, to notify for proceeding to validation */ |
| private static final String LAST_KEY = "LAST_KEY"; |
| |
| /** |
| * The eventID for the last key, used to identify the last event so that |
| * client can proceed for validation |
| */ |
| private static final EventID eventIdForLastKey = new EventID(new byte[] { 1, |
| 2 }, 3, 4); |
| |
| /** |
| * An array of eventIds having possible combinations of threadId and |
| * sequenceId values |
| */ |
| private static final EventID[] eventIds = new EventID[] { |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_BYTE), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_SHORT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_INT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_BYTE, ID_VALUE_LONG), |
| |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_BYTE), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_SHORT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_INT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_SHORT, ID_VALUE_LONG), |
| |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_BYTE), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_SHORT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_INT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_INT, ID_VALUE_LONG), |
| |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_BYTE), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_SHORT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_INT), |
| new EventID(new byte[] { 1, 1 }, ID_VALUE_LONG, ID_VALUE_LONG) }; |
| |
| /** Constructor */ |
| public EventIdOptimizationDUnitTest(String name) { |
| super(name); |
| } |
| |
| /** |
| * Sets up the cache-servers and clients for the test |
| * |
| * @throws Exception - |
| * thrown in any problem occurs in setUp |
| */ |
| public void setUp() throws Exception { |
| super.setUp(); |
| disconnectAllFromDS(); |
| |
| final Host host = Host.getHost(0); |
| server1 = host.getVM(0); |
| server2 = host.getVM(1); |
| client1 = host.getVM(2); |
| client2 = host.getVM(3); |
| |
| int PORT1 = ((Integer)server1.invoke(EventIdOptimizationDUnitTest.class, |
| "createServerCache")).intValue(); |
| int PORT2 = ((Integer)server2.invoke(EventIdOptimizationDUnitTest.class, |
| "createServerCache")).intValue(); |
| |
| client1.invoke(EventIdOptimizationDUnitTest.class, "createClientCache1", |
| new Object[] { getServerHostName(host), new Integer(PORT1) }); |
| client2.invoke(EventIdOptimizationDUnitTest.class, "createClientCache2", |
| new Object[] { getServerHostName(host), new Integer(PORT2) }); |
| |
| } |
| |
| /** |
| * Creates the cache |
| * |
| * @param props - |
| * distributed system props |
| * @throws Exception - |
| * thrown in any problem occurs in creating cache |
| */ |
| private void createCache(Properties props) throws Exception |
| { |
| DistributedSystem ds = getSystem(props); |
| cache = CacheFactory.create(ds); |
| assertNotNull(cache); |
| } |
| |
| /** Creates cache and starts the bridge-server */ |
| public static Integer createServerCache() throws Exception |
| { |
| new EventIdOptimizationDUnitTest("temp").createCache(new Properties()); |
| AttributesFactory factory = new AttributesFactory(); |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| factory.setMirrorType(MirrorType.KEYS_VALUES); |
| RegionAttributes attrs = factory.create(); |
| cache.createRegion(REGION_NAME, attrs); |
| |
| // create multiple dummy regions to use them in destroyRegion case for |
| // testing eventIDs |
| for (int i = 0; i < eventIds.length; i++) { |
| cache.createRegion(REGION_NAME + i, attrs); |
| } |
| BridgeServer server = cache.addBridgeServer(); |
| assertNotNull(server); |
| int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); |
| server.setPort(port); |
| server.setNotifyBySubscription(true); |
| server.start(); |
| return new Integer(server.getPort()); |
| } |
| |
| /** |
| * Creates the client cache1, connected to server1 |
| * |
| * @param port - |
| * bridgeserver port |
| * @throws Exception - |
| * thrown if any problem occurs in setting up the client |
| */ |
| public static void createClientCache1(String hostName, Integer port) throws Exception |
| { |
| Properties props = new Properties(); |
| props.setProperty("mcast-port", "0"); |
| props.setProperty("locators", ""); |
| new EventIdOptimizationDUnitTest("temp").createCache(props); |
| |
| AttributesFactory factory = new AttributesFactory(); |
| BridgeTestCase.configureConnectionPool(factory, hostName, port.intValue(),-1, true, -1, 2, null); |
| final BridgeServer bs1 = cache.addBridgeServer(); |
| bs1.setPort(port.intValue()); |
| |
| pool = (PoolImpl)PoolManager.find("testPool"); |
| } |
| |
| /** |
| * Creates the client cache2, connected to server3 |
| * |
| * @param port - |
| * bridgeserver port |
| * @throws Exception - |
| * thrown if any problem occurs in setting up the client |
| */ |
| public static void createClientCache2(String hostName, Integer port) throws Exception |
| { |
| Properties props = new Properties(); |
| props.setProperty("mcast-port", "0"); |
| props.setProperty("locators", ""); |
| new EventIdOptimizationDUnitTest("temp").createCache(props); |
| AttributesFactory factory = new AttributesFactory(); |
| BridgeTestCase.configureConnectionPool(factory, hostName, port.intValue(),-1, true, -1, 2, null); |
| |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| |
| factory.addCacheListener(new CacheListenerAdapter() { |
| public void afterCreate(EntryEvent event) |
| { |
| String key = (String)event.getKey(); |
| validateEventsAtReceivingClientListener(key); |
| } |
| |
| public void afterDestroy(EntryEvent event) |
| { |
| String key = (String)event.getKey(); |
| validateEventsAtReceivingClientListener(key); |
| } |
| |
| public void afterRegionDestroy(RegionEvent event) |
| { |
| |
| validateEventsAtReceivingClientListener(" <destroyRegion Event> "); |
| } |
| |
| public void afterRegionClear(RegionEvent event) |
| { |
| |
| validateEventsAtReceivingClientListener(" <clearRegion Event> "); |
| } |
| |
| }); |
| RegionAttributes attrs = factory.create(); |
| Region region = cache.createRegion(REGION_NAME, attrs); |
| region.registerInterest("ALL_KEYS"); |
| for (int i = 0; i < eventIds.length; i++) { |
| region = cache.createRegion(REGION_NAME + i, attrs); |
| region.registerInterest("ALL_KEYS"); |
| } |
| |
| pool = (PoolImpl)PoolManager.find("testPool"); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId, via put |
| * operation through connection object |
| * |
| * @throws Exception - |
| * thrown if any problem occurs in put operation |
| */ |
| public static void generateEventsByPutOperation() throws Exception |
| { |
| Connection connection = pool.acquireConnection(); |
| String regionName = Region.SEPARATOR + REGION_NAME; |
| ServerRegionProxy srp = new ServerRegionProxy(regionName, pool); |
| |
| for (int i = 0; i < eventIds.length; i++) { |
| srp.putOnForTestsOnly(connection, "KEY-" + i, "VAL-" + i, eventIds[i], null); |
| } |
| srp.putOnForTestsOnly(connection, LAST_KEY, "LAST_VAL", eventIdForLastKey, null); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId, via |
| * destroyEntry operation through connection object |
| * |
| * @throws Exception - |
| * thrown if any problem occurs in destroyEntry operation |
| */ |
| public static void generateEventsByDestroyEntryOperation() throws Exception |
| { |
| Connection connection = pool.acquireConnection(); |
| String regionName = Region.SEPARATOR + REGION_NAME; |
| ServerRegionProxy srp = new ServerRegionProxy(regionName, pool); |
| |
| for (int i = 0; i < eventIds.length; i++) { |
| srp.destroyOnForTestsOnly(connection, "KEY-" + i, null, Operation.DESTROY, new EntryEventImpl(eventIds[i]), null); |
| } |
| srp.destroyOnForTestsOnly(connection, LAST_KEY, null, Operation.DESTROY, new EntryEventImpl(eventIdForLastKey), null); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId, via |
| * destroyRegionOperation through connection object |
| * |
| * @throws Exception - |
| * thrown if any problem occurs in destroyRegionOperation |
| */ |
| public static void generateEventsByDestroyRegionOperation() throws Exception |
| { |
| Connection connection = pool.acquireConnection(); |
| String regionName = Region.SEPARATOR + REGION_NAME; |
| |
| for (int i = 0; i < 1; i++) { |
| ServerRegionProxy srp = new ServerRegionProxy(regionName+i, pool); |
| srp.destroyRegionOnForTestsOnly(connection, eventIds[i], null); |
| } |
| { |
| ServerRegionProxy srp = new ServerRegionProxy(regionName, pool); |
| srp.destroyRegionOnForTestsOnly(connection, eventIdForLastKey, null); |
| } |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId, via |
| * clearRegionOperation through connection object |
| * |
| * @throws Exception - |
| * thrown if any problem occurs in clearRegionOperation |
| */ |
| public static void generateEventsByClearRegionOperation() throws Exception |
| { |
| Connection connection = pool.acquireConnection(); |
| String regionName = Region.SEPARATOR + REGION_NAME; |
| ServerRegionProxy srp = new ServerRegionProxy(regionName, pool); |
| |
| for (int i = 0; i < eventIds.length; i++) { |
| srp.clearOnForTestsOnly(connection, eventIds[i], null); |
| } |
| srp.clearOnForTestsOnly(connection, eventIdForLastKey, null); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId from |
| * client1 via put operation and verifies that the values received on client2 |
| * match with those sent from client1. |
| * |
| * @throws Exception - |
| * thrown if any exception occurs in test |
| */ |
| public void testEventIdOptimizationByPutOperation() throws Exception |
| { |
| client1.invoke(EventIdOptimizationDUnitTest.class, |
| "generateEventsByPutOperation"); |
| client2.invoke(EventIdOptimizationDUnitTest.class, |
| "verifyEventIdsOnClient2"); |
| |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId from |
| * client1 via destroyEntry operation and verifies that the values received on |
| * client2 match with those sent from client1. |
| * |
| * @throws Exception - |
| * thrown if any exception occurs in test |
| */ |
| public void testEventIdOptimizationByDestroyEntryOperation() throws Exception |
| { |
| client1.invoke(EventIdOptimizationDUnitTest.class, |
| "generateEventsByDestroyEntryOperation"); |
| client2.invoke(EventIdOptimizationDUnitTest.class, |
| "verifyEventIdsOnClient2"); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId from |
| * client1 via destroyRegion operation and verifies that the values received |
| * on client2 match with those sent from client1. |
| * |
| * @throws Exception - |
| * thrown if any exception occurs in test |
| */ |
| public void testEventIdOptimizationByDestroyRegionOperation() |
| throws Exception |
| { |
| client1.invoke(EventIdOptimizationDUnitTest.class, |
| "generateEventsByDestroyRegionOperation"); |
| client2.invoke(EventIdOptimizationDUnitTest.class, |
| "verifyEventIdsOnClient2"); |
| } |
| |
| /** |
| * Generates events having specific values of threadId and sequenceId from |
| * client1 via clearRegion operation and verifies that the values received on |
| * client2 match with those sent from client1. |
| * |
| * @throws Exception - |
| * thrown if any exception occurs in test |
| */ |
| public void testEventIdOptimizationByClearRegionOperation() throws Exception |
| { |
| client1.invoke(EventIdOptimizationDUnitTest.class, |
| "generateEventsByClearRegionOperation"); |
| client2.invoke(EventIdOptimizationDUnitTest.class, |
| "verifyEventIdsOnClient2"); |
| } |
| |
| /** |
| * Waits for the listener to receive all events and validates that no |
| * exception occured in client |
| */ |
| public static void verifyEventIdsOnClient2() |
| { |
| if (!proceedForValidation) { |
| synchronized (EventIdOptimizationDUnitTest.class) { |
| if (!proceedForValidation) |
| try { |
| getLogWriter().info( |
| "Client2 going in wait before starting validation"); |
| EventIdOptimizationDUnitTest.class.wait(); |
| } |
| catch (InterruptedException e) { |
| fail("interrupted"); |
| } |
| } |
| } |
| getLogWriter().info("Starting validation on client2"); |
| if (validationFailed) { |
| fail("\n The following eventIds recieved by client2 were not present in the eventId array sent by client1 \n" |
| + failureMsg); |
| } |
| getLogWriter().info("Validation complete on client2, goin to unregister listeners"); |
| |
| Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME); |
| if (region != null && !region.isDestroyed()) { |
| try { |
| AttributesMutator mutator = region.getAttributesMutator(); |
| mutator.initCacheListeners(null); |
| } |
| catch (RegionDestroyedException ignore) { |
| } |
| } |
| |
| for (int i = 0; i < eventIds.length; i++) { |
| region = cache.getRegion(Region.SEPARATOR + REGION_NAME + i); |
| if (region != null && !region.isDestroyed()) { |
| try { |
| AttributesMutator mutator = region.getAttributesMutator(); |
| mutator.initCacheListeners(null); |
| } |
| catch (RegionDestroyedException ignore) { |
| } |
| } |
| } |
| |
| getLogWriter().info("Test completed, Unregistered the listeners"); |
| } |
| |
| /** |
| * Closes the cache |
| * |
| */ |
| public static void closeCache() |
| { |
| if (cache != null && !cache.isClosed()) { |
| cache.close(); |
| cache.getDistributedSystem().disconnect(); |
| } |
| } |
| |
| /** |
| * Closes the caches on clients and servers |
| */ |
| public void tearDown2() throws Exception |
| { |
| // close client |
| client1.invoke(EventIdOptimizationDUnitTest.class, "closeCache"); |
| client2.invoke(EventIdOptimizationDUnitTest.class, "closeCache"); |
| // close server |
| server1.invoke(EventIdOptimizationDUnitTest.class, "closeCache"); |
| server2.invoke(EventIdOptimizationDUnitTest.class, "closeCache"); |
| |
| } |
| |
| /** |
| * Function to assert that the ThreadIdtoSequence id Map is not Null and has |
| * only one entry. |
| * |
| * @return - eventID object from the ThreadIdToSequenceIdMap |
| */ |
| public static Object assertThreadIdToSequenceIdMapHasEntryId() |
| { |
| Map map = pool.getThreadIdToSequenceIdMap(); |
| assertNotNull(map); |
| // The map size can now be 1 or 2 because of the server thread putting |
| // the marker in the queue. If it is 2, the first entry is the server |
| // thread; the second is the client thread. If it is 1, the entry is the |
| // client thread. The size changes because of the map.clear call below. |
| |
| assertTrue(map.size() != 0); |
| |
| // Set the entry to the last entry |
| Map.Entry entry = null; |
| for (Iterator threadIdToSequenceIdMapIterator = map.entrySet().iterator(); threadIdToSequenceIdMapIterator.hasNext();) { |
| entry = (Map.Entry)threadIdToSequenceIdMapIterator.next(); |
| } |
| |
| ThreadIdentifier tid = (ThreadIdentifier)entry.getKey(); |
| SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject)entry |
| .getValue(); |
| long sequenceId = seo.getSequenceId(); |
| EventID evId = new EventID(tid.getMembershipID(), tid.getThreadID(), |
| sequenceId); |
| synchronized(map) { |
| map.clear(); |
| } |
| return evId; |
| } |
| |
| /** |
| * Validates that the eventId of the event received in callback is contained |
| * in the eventId array originally used by client1 to generate the events and |
| * notifies client2 to proceed for validation once the LAST_KEY is received |
| * |
| * @param key - |
| * the key of the event for EntryEvent / token indicating type of |
| * region operation for RegionEvent |
| */ |
| public static void validateEventsAtReceivingClientListener(String key) |
| { |
| EventID eventIdAtClient2 = (EventID)assertThreadIdToSequenceIdMapHasEntryId(); |
| if ((eventIdAtClient2.getThreadID() == eventIdForLastKey.getThreadID()) |
| && (eventIdAtClient2.getSequenceID() == eventIdForLastKey |
| .getSequenceID())) { |
| synchronized (EventIdOptimizationDUnitTest.class) { |
| getLogWriter().info("Notifying client2 to proceed for validation"); |
| proceedForValidation = true; |
| EventIdOptimizationDUnitTest.class.notify(); |
| } |
| } |
| else { |
| boolean containsEventId = false; |
| for (int i = 0; i < eventIds.length; i++) { |
| if ((eventIdAtClient2.getThreadID() == eventIds[i].getThreadID()) |
| && (eventIdAtClient2.getSequenceID() == eventIds[i].getSequenceID())) { |
| containsEventId = true; |
| break; |
| } |
| } |
| if (!containsEventId) { |
| validationFailed = true; |
| failureMsg.append("key = ").append(key).append(" ; eventID = ").append( |
| eventIdAtClient2).append(System.getProperty("line.separator")); |
| } |
| } |
| } |
| } |