blob: 7240da06b49abf99491c6bd3f878815c30aa4165 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.ha;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.MirrorType;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.Connection;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.client.internal.QueueStateImpl.SequenceIdAndExpirationObject;
import org.apache.geode.cache.client.internal.ServerRegionProxy;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache30.ClientServerTestCase;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.EventIDHolder;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
/**
* This test verifies that eventId, while being sent across the network ( client to server, server
* to client and peer to peer) , goes as optimized byte-array. For client to server messages, the
* membership id part of event-id is not need to be sent with each event. Also, the threadId and
* sequenceId need not be sent as long if their value is small. This test has two servers and two
* clients , each connected to one server. The events with event-ids having specific values for
* thread-id and sequence-id are generated by client-1 and sent to server-1 and then to server-2 via
* p2p and then finally to client-2. It is verified that client-2 receives the same values for
* thread-id and sequence-id.
*/
@Category({ClientSubscriptionTest.class})
public class EventIdOptimizationDUnitTest extends JUnit4DistributedTestCase {
/** Cache-server1 */
VM server1 = null;
/** Cache-server2 */
VM server2 = null;
/** Client1 , connected to Cache-server1 */
VM client1 = null;
/** Client2 , connected to Cache-server2 */
VM client2 = null;
/** The long id (threadId or sequenceId) having value equivalent to byte */
private static final long ID_VALUE_BYTE = Byte.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to short */
private static final long ID_VALUE_SHORT = Short.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to int */
private static final long ID_VALUE_INT = Integer.MAX_VALUE;
/** The long id (threadId or sequenceId) having value equivalent to long */
private static final long ID_VALUE_LONG = Long.MAX_VALUE;
/** Name of the test region */
private static final String REGION_NAME = "EventIdOptimizationDUnitTest_region";
/** The cache instance for test cases */
protected static Cache cache = null;
/**
* Connection proxy object to get connection for performing events that will have specific
* eventIds
*/
private static PoolImpl pool = null;
/** Boolean to indicate the client to proceed for validation */
private static volatile boolean proceedForValidation = false;
/** Boolean to propagate the failure in listener to the client */
private static volatile boolean validationFailed = false;
/** StringBuffer to hold the failure messages in client listener */
static StringBuffer failureMsg = new StringBuffer();
/** The last key for operations, to notify for proceeding to validation */
private static final String LAST_KEY = "LAST_KEY";
/**
* The eventID for the last key, used to identify the last event so that client can proceed for
* validation
*/
private static final EventID eventIdForLastKey = new EventID(new byte[] {1, 2}, 3, 4);
/**
* An array of eventIds having possible combinations of threadId and sequenceId values
*/
private static final EventID[] eventIds =
new EventID[] {new EventID(new byte[] {1, 1}, ID_VALUE_BYTE, ID_VALUE_BYTE),
new EventID(new byte[] {1, 1}, ID_VALUE_BYTE, ID_VALUE_SHORT),
new EventID(new byte[] {1, 1}, ID_VALUE_BYTE, ID_VALUE_INT),
new EventID(new byte[] {1, 1}, ID_VALUE_BYTE, ID_VALUE_LONG),
new EventID(new byte[] {1, 1}, ID_VALUE_SHORT, ID_VALUE_BYTE),
new EventID(new byte[] {1, 1}, ID_VALUE_SHORT, ID_VALUE_SHORT),
new EventID(new byte[] {1, 1}, ID_VALUE_SHORT, ID_VALUE_INT),
new EventID(new byte[] {1, 1}, ID_VALUE_SHORT, ID_VALUE_LONG),
new EventID(new byte[] {1, 1}, ID_VALUE_INT, ID_VALUE_BYTE),
new EventID(new byte[] {1, 1}, ID_VALUE_INT, ID_VALUE_SHORT),
new EventID(new byte[] {1, 1}, ID_VALUE_INT, ID_VALUE_INT),
new EventID(new byte[] {1, 1}, ID_VALUE_INT, ID_VALUE_LONG),
new EventID(new byte[] {1, 1}, ID_VALUE_LONG, ID_VALUE_BYTE),
new EventID(new byte[] {1, 1}, ID_VALUE_LONG, ID_VALUE_SHORT),
new EventID(new byte[] {1, 1}, ID_VALUE_LONG, ID_VALUE_INT),
new EventID(new byte[] {1, 1}, ID_VALUE_LONG, ID_VALUE_LONG)};
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
server1 = host.getVM(0);
server2 = host.getVM(1);
client1 = host.getVM(2);
client2 = host.getVM(3);
int PORT1 = ((Integer) server1.invoke(() -> EventIdOptimizationDUnitTest.createServerCache()))
.intValue();
int PORT2 = ((Integer) server2.invoke(() -> EventIdOptimizationDUnitTest.createServerCache()))
.intValue();
client1.invoke(() -> EventIdOptimizationDUnitTest
.createClientCache1(NetworkUtils.getServerHostName(host), new Integer(PORT1)));
client2.invoke(() -> EventIdOptimizationDUnitTest
.createClientCache2(NetworkUtils.getServerHostName(host), new Integer(PORT2)));
}
/**
* Creates the cache
*
* @param props - distributed system props
* @throws Exception - thrown in any problem occurs in creating cache
*/
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
}
/** Creates cache and starts the bridge-server */
public static Integer createServerCache() throws Exception {
new EventIdOptimizationDUnitTest().createCache(new Properties());
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setMirrorType(MirrorType.KEYS_VALUES);
RegionAttributes attrs = factory.create();
cache.createRegion(REGION_NAME, attrs);
// create multiple dummy regions to use them in destroyRegion case for
// testing eventIDs
for (int i = 0; i < eventIds.length; i++) {
cache.createRegion(REGION_NAME + i, attrs);
}
CacheServer server = cache.addCacheServer();
assertNotNull(server);
int port = getRandomAvailableTCPPort();
server.setPort(port);
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
/**
* Creates the client cache1, connected to server1
*
* @param port - bridgeserver port
* @throws Exception - thrown if any problem occurs in setting up the client
*/
public static void createClientCache1(String hostName, Integer port) throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new EventIdOptimizationDUnitTest().createCache(props);
AttributesFactory factory = new AttributesFactory();
ClientServerTestCase.configureConnectionPool(factory, hostName, port.intValue(), -1, true, -1,
2, null);
final CacheServer bs1 = cache.addCacheServer();
bs1.setPort(port.intValue());
pool = (PoolImpl) PoolManager.find("testPool");
}
/**
* Creates the client cache2, connected to server3
*
* @param port - bridgeserver port
* @throws Exception - thrown if any problem occurs in setting up the client
*/
public static void createClientCache2(String hostName, Integer port) throws Exception {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new EventIdOptimizationDUnitTest().createCache(props);
AttributesFactory factory = new AttributesFactory();
ClientServerTestCase.configureConnectionPool(factory, hostName, port.intValue(), -1, true, -1,
2, null);
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
String key = (String) event.getKey();
validateEventsAtReceivingClientListener(key);
}
@Override
public void afterDestroy(EntryEvent event) {
String key = (String) event.getKey();
validateEventsAtReceivingClientListener(key);
}
@Override
public void afterRegionDestroy(RegionEvent event) {
validateEventsAtReceivingClientListener(" <destroyRegion Event> ");
}
@Override
public void afterRegionClear(RegionEvent event) {
validateEventsAtReceivingClientListener(" <clearRegion Event> ");
}
});
RegionAttributes attrs = factory.create();
Region region = cache.createRegion(REGION_NAME, attrs);
region.registerInterest("ALL_KEYS");
for (int i = 0; i < eventIds.length; i++) {
region = cache.createRegion(REGION_NAME + i, attrs);
region.registerInterest("ALL_KEYS");
}
pool = (PoolImpl) PoolManager.find("testPool");
}
/**
* Generates events having specific values of threadId and sequenceId, via put operation through
* connection object
*
* @throws Exception - thrown if any problem occurs in put operation
*/
public static void generateEventsByPutOperation() throws Exception {
Connection connection = pool.acquireConnection();
String regionName = SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
srp.putOnForTestsOnly(connection, "KEY-" + i, "VAL-" + i, eventIds[i], null);
}
srp.putOnForTestsOnly(connection, LAST_KEY, "LAST_VAL", eventIdForLastKey, null);
}
/**
* Generates events having specific values of threadId and sequenceId, via destroyEntry operation
* through connection object
*
* @throws Exception - thrown if any problem occurs in destroyEntry operation
*/
public static void generateEventsByDestroyEntryOperation() throws Exception {
Connection connection = pool.acquireConnection();
String regionName = SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
String key = "KEY-" + i;
srp.putOnForTestsOnly(connection, key, "value-" + i, eventIds[i], null);
srp.destroyOnForTestsOnly(connection, key, null, Operation.DESTROY,
new EventIDHolder(eventIds[i]), null);
}
srp.putOnForTestsOnly(connection, LAST_KEY, "lastValue", eventIdForLastKey, null);
srp.destroyOnForTestsOnly(connection, LAST_KEY, null, Operation.DESTROY,
new EventIDHolder(eventIdForLastKey), null);
}
/**
* Generates events having specific values of threadId and sequenceId, via destroyRegionOperation
* through connection object
*
* @throws Exception - thrown if any problem occurs in destroyRegionOperation
*/
public static void generateEventsByDestroyRegionOperation() throws Exception {
Connection connection = pool.acquireConnection();
String regionName = SEPARATOR + REGION_NAME;
for (int i = 0; i < 1; i++) {
ServerRegionProxy srp = new ServerRegionProxy(regionName + i, pool);
srp.destroyRegionOnForTestsOnly(connection, eventIds[i], null);
}
{
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
srp.destroyRegionOnForTestsOnly(connection, eventIdForLastKey, null);
}
}
/**
* Generates events having specific values of threadId and sequenceId, via clearRegionOperation
* through connection object
*
* @throws Exception - thrown if any problem occurs in clearRegionOperation
*/
public static void generateEventsByClearRegionOperation() throws Exception {
Connection connection = pool.acquireConnection();
String regionName = SEPARATOR + REGION_NAME;
ServerRegionProxy srp = new ServerRegionProxy(regionName, pool);
for (int i = 0; i < eventIds.length; i++) {
srp.clearOnForTestsOnly(connection, eventIds[i], null);
}
srp.clearOnForTestsOnly(connection, eventIdForLastKey, null);
}
/**
* Generates events having specific values of threadId and sequenceId from client1 via put
* operation and verifies that the values received on client2 match with those sent from client1.
*
* @throws Exception - thrown if any exception occurs in test
*/
@Test
public void testEventIdOptimizationByPutOperation() throws Exception {
client1.invoke(() -> EventIdOptimizationDUnitTest.generateEventsByPutOperation());
client2.invoke(() -> EventIdOptimizationDUnitTest.verifyEventIdsOnClient2());
}
/**
* Generates events having specific values of threadId and sequenceId from client1 via
* destroyEntry operation and verifies that the values received on client2 match with those sent
* from client1.
*
* @throws Exception - thrown if any exception occurs in test
*/
@Test
public void testEventIdOptimizationByDestroyEntryOperation() throws Exception {
client1.invoke(() -> EventIdOptimizationDUnitTest.generateEventsByDestroyEntryOperation());
client2.invoke(() -> EventIdOptimizationDUnitTest.verifyEventIdsOnClient2());
}
/**
* Generates events having specific values of threadId and sequenceId from client1 via
* destroyRegion operation and verifies that the values received on client2 match with those sent
* from client1.
*
* @throws Exception - thrown if any exception occurs in test
*/
@Test
public void testEventIdOptimizationByDestroyRegionOperation() throws Exception {
client1.invoke(() -> EventIdOptimizationDUnitTest.generateEventsByDestroyRegionOperation());
client2.invoke(() -> EventIdOptimizationDUnitTest.verifyEventIdsOnClient2());
}
/**
* Generates events having specific values of threadId and sequenceId from client1 via clearRegion
* operation and verifies that the values received on client2 match with those sent from client1.
*
* @throws Exception - thrown if any exception occurs in test
*/
@Test
public void testEventIdOptimizationByClearRegionOperation() throws Exception {
client1.invoke(() -> EventIdOptimizationDUnitTest.generateEventsByClearRegionOperation());
client2.invoke(() -> EventIdOptimizationDUnitTest.verifyEventIdsOnClient2());
}
/**
* Waits for the listener to receive all events and validates that no exception occurred in client
*/
public static void verifyEventIdsOnClient2() {
await("Waiting for proceedForValidation to be true")
.until(() -> proceedForValidation);
LogWriterUtils.getLogWriter().info("Starting validation on client2");
if (validationFailed) {
fail(
"\n The following eventIds received by client2 were not present in the eventId array sent by client1 \n"
+ failureMsg);
}
LogWriterUtils.getLogWriter()
.info("Validation complete on client2, goin to unregister listeners");
Region region = cache.getRegion(SEPARATOR + REGION_NAME);
if (region != null && !region.isDestroyed()) {
try {
AttributesMutator mutator = region.getAttributesMutator();
mutator.initCacheListeners(null);
} catch (RegionDestroyedException ignore) {
}
}
for (int i = 0; i < eventIds.length; i++) {
region = cache.getRegion(SEPARATOR + REGION_NAME + i);
if (region != null && !region.isDestroyed()) {
try {
AttributesMutator mutator = region.getAttributesMutator();
mutator.initCacheListeners(null);
} catch (RegionDestroyedException ignore) {
}
}
}
LogWriterUtils.getLogWriter().info("Test completed, Unregistered the listeners");
}
/**
* Closes the cache
*
*/
public static void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
}
}
/**
* Closes the caches on clients and servers
*/
@Override
public final void preTearDown() throws Exception {
// close client
client1.invoke(() -> EventIdOptimizationDUnitTest.closeCache());
client2.invoke(() -> EventIdOptimizationDUnitTest.closeCache());
// close server
server1.invoke(() -> EventIdOptimizationDUnitTest.closeCache());
server2.invoke(() -> EventIdOptimizationDUnitTest.closeCache());
}
/**
* Function to assert that the ThreadIdtoSequence id Map is not Null and has only one entry.
*
* @return - eventID object from the ThreadIdToSequenceIdMap
*/
public static Object assertThreadIdToSequenceIdMapHasEntryId() {
Map map = pool.getThreadIdToSequenceIdMap();
assertNotNull(map);
// The map size can now be 1 or 2 because of the server thread putting
// the marker in the queue. If it is 2, the first entry is the server
// thread; the second is the client thread. If it is 1, the entry is the
// client thread. The size changes because of the map.clear call below.
assertTrue(map.size() != 0);
// Set the entry to the last entry
Map.Entry entry = null;
for (Iterator threadIdToSequenceIdMapIterator =
map.entrySet().iterator(); threadIdToSequenceIdMapIterator.hasNext();) {
entry = (Map.Entry) threadIdToSequenceIdMapIterator.next();
}
ThreadIdentifier tid = (ThreadIdentifier) entry.getKey();
SequenceIdAndExpirationObject seo = (SequenceIdAndExpirationObject) entry.getValue();
long sequenceId = seo.getSequenceId();
EventID evId = new EventID(tid.getMembershipID(), tid.getThreadID(), sequenceId);
synchronized (map) {
map.clear();
}
return evId;
}
/**
* Validates that the eventId of the event received in callback is contained in the eventId array
* originally used by client1 to generate the events and notifies client2 to proceed for
* validation once the LAST_KEY is received
*
* @param key - the key of the event for EntryEvent / token indicating type of region operation
* for RegionEvent
*/
public static void validateEventsAtReceivingClientListener(String key) {
EventID eventIdAtClient2 = (EventID) assertThreadIdToSequenceIdMapHasEntryId();
if ((eventIdAtClient2.getThreadID() == eventIdForLastKey.getThreadID())
&& (eventIdAtClient2.getSequenceID() == eventIdForLastKey.getSequenceID())) {
LogWriterUtils.getLogWriter().info("Notifying client2 to proceed for validation");
proceedForValidation = true;
} else {
boolean containsEventId = false;
for (int i = 0; i < eventIds.length; i++) {
if ((eventIdAtClient2.getThreadID() == eventIds[i].getThreadID())
&& (eventIdAtClient2.getSequenceID() == eventIds[i].getSequenceID())) {
containsEventId = true;
break;
}
}
if (!containsEventId) {
validationFailed = true;
failureMsg.append("key = ").append(key).append(" ; eventID = ").append(eventIdAtClient2)
.append(System.getProperty("line.separator"));
}
}
}
}