blob: 284810021c4612de52535013bf49fc55acd9b6e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.tier.sockets;
import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.distributed.ConfigurationProperties.DELTA_PROPAGATION;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Properties;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.DeltaTestImpl;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.AttributesMutator;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.cache.client.PoolManager;
import org.apache.geode.cache.client.internal.PoolImpl;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqAttributesFactory;
import org.apache.geode.cache.query.CqEvent;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.cache.util.CqListenerAdapter;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.TestObjectWithIdentifier;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
import org.apache.geode.test.junit.categories.SerializationTest;
/**
* Test client to server flow for delta propogation
*
* @since GemFire 6.1
*/
@Category({ClientSubscriptionTest.class, SerializationTest.class})
public class ClientToServerDeltaDUnitTest extends JUnit4DistributedTestCase {
/*
* Test configuration one server one client also 2 server 2 client
*/
private static Cache cache = null;
private static LogWriter logger = null;
VM server = null;
VM server2 = null;
VM client = null;
VM client2 = null;
private static final String KEY1 = "DELTA_KEY_1";
private static final String REGION_NAME = "ClientToServerDeltaDunitTest_region";
private static final int NO_PUT_OPERATION = 3;
private static PoolImpl pool;
private static Object[] putDelta = {"Anil", DeltaTestImpl.ERRONEOUS_STRING_FOR_FROM_DELTA, 100,
DeltaTestImpl.ERRONEOUS_INT_FOR_TO_DELTA};
private static int updates = 0;
private static int cqUpdates = 0;
private static int create = 0;
private static Object firstUpdate = null;
private static Object secondUpdate = null;
private static Boolean error;
private static boolean lastKeyReceived = false;
private static Region region = null;
private static CSDeltaTestImpl csDelta = null;
public static String DELTA_KEY = "DELTA_KEY";
private static final String[] CQs = new String[] {"select * from " + SEPARATOR + REGION_NAME,
"select * from " + SEPARATOR + REGION_NAME + " where intVar = 0",
"select * from " + SEPARATOR + REGION_NAME + " where intVar > 0",
"select * from " + SEPARATOR + REGION_NAME + " where intVar < 0"};
public static String LAST_KEY = "LAST_KEY";
@Rule
public DistributedRestoreSystemProperties restoreSystemProperties =
new DistributedRestoreSystemProperties();
@Override
public final void postSetUp() throws Exception {
disconnectAllFromDS();
final Host host = Host.getHost(0);
server = host.getVM(0);
server2 = host.getVM(1);
client = host.getVM(2);
client2 = host.getVM(3);
}
@Override
public final void preTearDown() throws Exception {
// reset all flags
DeltaTestImpl.resetDeltaInvokationCounters();
server.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters());
server2.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters());
client.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters());
client2.invoke(() -> DeltaTestImpl.resetDeltaInvokationCounters());
// close the clients first
client.invoke(() -> ClientToServerDeltaDUnitTest.closeCache());
client2.invoke(() -> ClientToServerDeltaDUnitTest.closeCache());
// then close the servers
server.invoke(() -> ClientToServerDeltaDUnitTest.closeCache());
server2.invoke(() -> ClientToServerDeltaDUnitTest.closeCache());
}
public void initialise(Boolean cq) {
initialise(Boolean.TRUE/* clone */, null, cq, Boolean.TRUE/* RI */,
Boolean.TRUE/* enable delta */);
}
public void initialise(Boolean clone, String[] queries, Boolean cq, Boolean RI,
Boolean enableDelta) {
Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest
.createServerCache(Boolean.TRUE, Boolean.FALSE, clone, enableDelta))).intValue();
Integer PORT2 = ((Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest
.createServerCache(Boolean.TRUE, Boolean.FALSE, clone, enableDelta))).intValue();
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE));
client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server2.getHost()), new Integer(PORT2), Boolean.TRUE,
Boolean.FALSE, cq, queries, RI));
}
// Same as initialise() except listener flag is false.
public void initialise2(Boolean clone, String[] queries, Boolean cq, Boolean RI,
Boolean enableDelta) {
Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest
.createServerCache(Boolean.FALSE, Boolean.FALSE, clone, enableDelta))).intValue();
Integer PORT2 = ((Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest
.createServerCache(Boolean.FALSE, Boolean.FALSE, clone, enableDelta))).intValue();
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE));
client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server2.getHost()), new Integer(PORT2), Boolean.TRUE,
Boolean.FALSE, cq, queries, RI));
}
/**
* This test does the following for single key (<b>failure of fromDelta- resending of full
* object</b>):<br>
* 1)Verifies that we donot loose attributes updates when delta fails<br>
*/
@Test
public void testSendingofFullDeltaObjectsWhenFromDeltaFails() {
initialise(false);
// set expected value on server
server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[1],
(Integer) putDelta[2]));
client.invoke(() -> ClientToServerDeltaDUnitTest.putWithFromDeltaERR(KEY1));
assertTrue("to Delta Propagation feature NOT used.",
((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue());
assertTrue("from Delta Propagation feature NOT used.",
((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue());
assertFalse("Delta Propagation toDeltaFailed",
((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue());
assertTrue("Delta Propagation fromDelta not Failed",
((Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure())).booleanValue());
boolean err =
((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue();
assertFalse("validation fails", err);
}
/**
* This test does the following for single key</br>
* 1)Verifies that we donot loose attributes updates when delta fails<br>
*/
@Test
public void testPutForDeltaObjects() {
initialise(false);
// set expected value on server
server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0],
(Integer) putDelta[2]));
client.invoke(() -> ClientToServerDeltaDUnitTest.put(KEY1));
assertTrue("to Delta Propagation feature NOT used.",
((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue());
assertTrue("from Delta Propagation feature NOT used.",
((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue());
assertFalse("Delta Propagation toDeltaFailed",
((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue());
assertFalse("Delta Propagation fromDeltaFailed",
((Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure())).booleanValue());
boolean err =
((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue();
assertFalse("validation fails", err);
}
/**
* This test does the following for single key (<b> full cycle</b>):<br>
* 1)Verifies that client-server, peer-peer and server-client processing delta<br>
*/
@Test
public void testClientToClientDeltaPropagation() throws Exception {
initialise(false);
// set expected value on s1,s1 and c2
server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0],
(Integer) putDelta[2]));
server2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0],
(Integer) putDelta[2]));
client2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0],
(Integer) putDelta[2]));
client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1));
Thread.sleep(5000);
assertTrue("To Delta Propagation feature NOT used.",
((Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue());
assertTrue("From Delta Propagation feature NOT used.",
((Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue());
// toDelta() should not be invoked
assertFalse("To Delta Propagation feature used.",
((Boolean) server.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue());
assertTrue("From Delta Propagation feature NOT used.",
((Boolean) server2.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue());
// toDelta() should not be invoked
assertFalse("To Delta Propagation feature used.",
((Boolean) server2.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed())).booleanValue());
assertTrue("from Delta Propagation feature NOT used.",
((Boolean) client2.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed())).booleanValue());
boolean err =
((Boolean) server.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue();
err = ((Boolean) server2.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue();
err = ((Boolean) client2.invoke(() -> ClientToServerDeltaDUnitTest.getError())).booleanValue();
assertFalse("validation fails", err);
}
@Test
public void testClientDeltaPropogationPutFetchesTheLatestValueWhenClientVersionIsOlder()
throws Exception {
// client did not register interest
Integer PORT1 = ((Integer) server.invoke(() -> ClientToServerDeltaDUnitTest
.createServerCache(Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, Boolean.TRUE))).intValue();
ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE, null, Boolean.FALSE);
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
r.put(KEY1, val);
server.invoke(() -> {
Region region = cache.getRegion(REGION_NAME);
DeltaTestImpl val1 = (DeltaTestImpl) region.get(KEY1);
val1.NEED_TO_RESET_T0_DELTA = false;
val1.setIntVar(1);
region.put(KEY1, val1);
});
DeltaTestImpl val2 = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val2.setStr("1");
val2.NEED_TO_RESET_T0_DELTA = false;
r.put(KEY1, val2);
server.invoke(() -> {
Region region = cache.getRegion(REGION_NAME);
assertThat((DeltaTestImpl) region.get(KEY1)).isNotNull();
});
DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
server.invoke(() -> {
Region region = cache.getRegion(REGION_NAME);
GeodeAwaitility.await()
.untilAsserted(() -> assertThat((DeltaTestImpl) region.get(KEY1)).isEqualTo(expected));
});
GeodeAwaitility.await()
.untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
}
@Test
public void clientDeltaPutFetchesTheLatestVersionIfNotYetReceivedQueuedEvent() {
server.invoke(() -> setSlowStartForTesting());
server2.invoke(() -> setSlowStartForTesting());
initialise(false);
DeltaTestImpl original = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
client.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
original.NEED_TO_RESET_T0_DELTA = false;
r.put(KEY1, original);
});
client2.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
assertThat(r.get(KEY1)).isEqualTo(original);
});
client.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
assertThat(val).isEqualTo(original);
val.NEED_TO_RESET_T0_DELTA = false;
val.setIntVar(1);
r.put(KEY1, val);
});
client2.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = (DeltaTestImpl) r.get(KEY1);
// delta update should not arrive yet due to slow dispatcher
assertThat(val).isEqualTo(original);
val.NEED_TO_RESET_T0_DELTA = false;
val.setStr("1");
r.put(KEY1, val);
Object o = r.get(KEY1);
logger.info("object is " + o);
});
server.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
r.get(KEY1);
});
server2.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
r.get(KEY1);
});
DeltaTestImpl expected = new DeltaTestImpl(1, "1", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
client.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
GeodeAwaitility.await()
.untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
});
client2.invoke(() -> {
Region r = cache.getRegion(REGION_NAME);
GeodeAwaitility.await()
.untilAsserted(() -> assertThat((DeltaTestImpl) r.get(KEY1)).isEqualTo(expected));
});
}
private void setSlowStartForTesting() {
CacheClientProxy.isSlowStartForTesting = true;
System.setProperty("slowStartTimeForTesting", "5000");
}
private static void putDeltaForCQ(String key, Integer numOfPuts, Integer[] cqIndices,
Boolean[] satisfyQuery) {
Region region = cache.getRegion(REGION_NAME);
DeltaTestImpl val = null;
for (int j = 0; j < numOfPuts; j++) {
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
for (int i = 0; i < cqIndices.length; i++) {
switch (i) {
case 0:
val.setStr("CASE_0");
// select *
break;
case 1:
val.setStr("CASE_1");
// select where intVar = 0
if (satisfyQuery[i]) {
val.setIntVar(0);
} else {
val.setIntVar(100);
}
break;
case 2:
val.setStr("CASE_2");
// select where intVar > 0
if (satisfyQuery[i]) {
val.setIntVar(100);
} else {
val.setIntVar(-100);
}
break;
case 3:
val.setStr("CASE_3");
// select where intVar < 0
if (satisfyQuery[i]) {
val.setIntVar(-100);
} else {
val.setIntVar(100);
}
break;
default:
break;
}
}
region.put(key, val);
}
}
/*
* put delta ; not previous deltas
*/
private static void putDelta(String key) {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = null;
for (int i = 0; i < NO_PUT_OPERATION; i++) {
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
switch (i) {
case 1:
val.setStr((String) putDelta[0]);
break;
case 2:
val.setIntVar(((Integer) putDelta[2]).intValue());
break;
}
r.put(key, val);
}
}
private static void putLastKey() {
Region r = cache.getRegion(REGION_NAME);
r.put(LAST_KEY, LAST_KEY);
}
private static void putLastKeyWithDelta() {
Region r = cache.getRegion(REGION_NAME);
r.put(LAST_KEY, new DeltaTestImpl());
}
private static void setFirstSecondUpdate(Object first, Object second) {
firstUpdate = first;
secondUpdate = second;
}
private static Boolean getError() {
return error;
}
/*
* put delta full cycle
*/
private static void put(String key) {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = null;
for (int i = 0; i < NO_PUT_OPERATION; i++) {
switch (i) {
case 0:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
break;
case 1:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val.setStr((String) putDelta[0]);
break;
case 2:
val = new DeltaTestImpl(0, (String) putDelta[1], new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val.setIntVar(((Integer) putDelta[2]).intValue());
break;
}
r.put(key, val);
}
}
/*
* put delta with some times fromDelta fails to apply client sends back full object
*/
private static void putWithFromDeltaERR(String key) {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = null;
for (int i = 0; i < NO_PUT_OPERATION; i++) {
switch (i) {
case 0:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
break;
case 1:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val.setStr((String) putDelta[1]);
break;
case 2:
val = new DeltaTestImpl(0, (String) putDelta[1], new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val.setIntVar(((Integer) putDelta[2]).intValue());
break;
}
r.put(key, val);
}
}
/*
* put delta with some times toDelta fails to apply; client sends back full object
*/
private static void putWithTODeltaERR(String key) {
Region r = cache.getRegion(REGION_NAME);
DeltaTestImpl val = null;
for (int i = 0; i < NO_PUT_OPERATION; i++) {
switch (i) {
case 0:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
break;
case 1:
val = new DeltaTestImpl(0, "0", new Double(0), new byte[0],
new TestObjectWithIdentifier("0", 0));
val.setIntVar(((Integer) putDelta[3]).intValue());
break;
case 2:
val = new DeltaTestImpl(((Integer) putDelta[3]).intValue(), "0", new Double(0),
new byte[0], new TestObjectWithIdentifier("0", 0));
val.setStr((String) putDelta[0]);
break;
}
r.put(key, val);
}
}
public static Integer createServerCache(Boolean attachListener, Boolean isEmpty)
throws Exception {
return createServerCache(attachListener, isEmpty, Boolean.TRUE/* clone */,
Boolean.TRUE/* enable delta */);
}
/*
* create server cache
*/
public static Integer createServerCache(Boolean attachListener, Boolean isEmpty, Boolean clone,
Boolean enableDelta) throws Exception {
// for validation
updates = 0;
create = 0;
firstUpdate = null;
secondUpdate = null;
error = false;
Properties props = new Properties();
props.setProperty(DELTA_PROPAGATION, enableDelta.toString());
new ClientToServerDeltaDUnitTest().createCache(props);
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(true);
if (isEmpty) {
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
factory.setDataPolicy(DataPolicy.EMPTY);
} else {
factory.setDataPolicy(DataPolicy.REPLICATE);
}
factory.setCloningEnabled(clone);
RegionAttributes attrs = factory.create();
region = cache.createRegion(REGION_NAME, attrs);
AttributesMutator am = region.getAttributesMutator();
if (attachListener) {
am.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
create++;
}
@Override
public void afterUpdate(EntryEvent event) {
switch (updates) {
case 0:
// first delta
validateUpdates(event, firstUpdate, "FIRST");
updates++;
break;
case 1:
// combine delta
validateUpdates(event, firstUpdate, "FIRST");
validateUpdates(event, secondUpdate, "SECOND");
updates++;
break;
default:
break;
}
}
});
} else if (!isEmpty) {
am.addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
switch (create) {
case 1:
validateUpdates(event, firstUpdate, "FIRST");
create++;
break;
case 2:
validateUpdates(event, secondUpdate, "SECOND");
create++;
break;
default:
create++;
break;
}
}
/*
* public void afterUpdate(EntryEvent event) { updates++; }
*/
});
}
CacheServer server = cache.addCacheServer();
int port = getRandomAvailableTCPPort();
server.setPort(port);
// ensures updates to be sent instead of invalidations
server.setNotifyBySubscription(true);
server.start();
return new Integer(server.getPort());
}
public static void createClientCache(String host, Integer port, Boolean attachListener,
Boolean isEmpty, Boolean isCq) throws Exception {
createClientCache(host, port, attachListener, isEmpty, isCq, new String[0], true);
}
public static void createClientCache(String host, Integer port, Boolean attachListener,
Boolean isEmpty, Boolean isCq, String[] cqQueryString) throws Exception {
createClientCache(host, port, attachListener, isEmpty, isCq, cqQueryString, true);
}
public static void createClientCache(String host, Integer port, Boolean attachListener,
Boolean isEmpty, Boolean isCq, String[] cqQueryString, Boolean registerInterestAll)
throws Exception {
createClientCache(host, port, attachListener, isEmpty, isCq, cqQueryString, registerInterestAll,
true);
}
/*
* create client cache
*/
public static void createClientCache(String host, Integer port, Boolean attachListener,
Boolean isEmpty, Boolean isCq, String[] cqQueryString, Boolean registerInterestAll,
Boolean enableSubscription) throws Exception {
updates = 0;
create = 0;
firstUpdate = null;
secondUpdate = null;
error = false;
lastKeyReceived = false;
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
new ClientToServerDeltaDUnitTest().createCache(props);
pool = (PoolImpl) PoolManager.createFactory().addServer(host, port.intValue())
.setMinConnections(2)
.setSubscriptionEnabled(enableSubscription).setSubscriptionRedundancy(0)
.setReadTimeout(10000).setPingInterval(1000).setSocketBufferSize(32768)
.create("ClientToServerDeltaDunitTestPool");
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setConcurrencyChecksEnabled(true);
if (isEmpty) {
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
factory.setDataPolicy(DataPolicy.EMPTY);
} else {
factory.setDataPolicy(DataPolicy.NORMAL);
}
factory.setPoolName(pool.getName());
factory.setCloningEnabled(false);
// region with empty data policy
RegionAttributes attrs = factory.create();
region = cache.createRegion(REGION_NAME, attrs);
if (attachListener) {
region.getAttributesMutator().addCacheListener(new CacheListenerAdapter() {
@Override
public void afterCreate(EntryEvent event) {
create++;
if (LAST_KEY.equals(event.getKey())) {
lastKeyReceived = true;
} ;
}
@Override
public void afterUpdate(EntryEvent event) {
switch (updates) {
case 0:
// first delta
validateUpdates(event, firstUpdate, "FIRST");
updates++;
break;
case 1:
// combine delta
validateUpdates(event, firstUpdate, "FIRST");
validateUpdates(event, secondUpdate, "SECOND");
updates++;
break;
default:
break;
}
}
});
}
if (registerInterestAll) {
region.registerInterest("ALL_KEYS");
}
if (isCq) {
CqAttributesFactory cqf = new CqAttributesFactory();
CqListenerAdapter cqlist = new CqListenerAdapter() {
@Override
public void onEvent(CqEvent cqEvent) {
Object key = cqEvent.getKey();
if (LAST_KEY.equals(key)) {
lastKeyReceived = true;
}
logger.fine(
"CQ event received for (key, value): (" + key + ", " + cqEvent.getNewValue() + ")");
}
@Override
public void onError(CqEvent cqEvent) {
logger.fine("CQ error received for key: " + cqEvent.getKey());
}
};
cqf.addCqListener(cqlist);
CqAttributes cqa = cqf.create();
for (int i = 0; i < cqQueryString.length; i++) {
CqQuery cq = cache.getQueryService().newCq("Delta_Query_" + i, cqQueryString[i], cqa);
cq.execute();
}
}
}
/*
* create cache with properties
*/
private void createCache(Properties props) throws Exception {
DistributedSystem ds = getSystem(props);
cache = CacheFactory.create(ds);
assertNotNull(cache);
logger = cache.getLogger();
}
// to validate updates in listener
private static void validateUpdates(EntryEvent event, Object obj, String str) {
if (obj instanceof String)
assertTrue(str + " update missed ",
((DeltaTestImpl) event.getNewValue()).getStr().equals((String) obj));
else if (obj instanceof Integer)
assertTrue(str + " update missed ",
((DeltaTestImpl) event.getNewValue()).getIntVar() == (Integer) obj);
else
error = true;
}
/*
* close cache
*/
public static void closeCache() {
if (cache != null && !cache.isClosed()) {
cache.close();
cache.getDistributedSystem().disconnect();
lastKeyReceived = false;
}
}
/**
* This test does the following for single key:<br>
* 1)Verifies that cacheless client calls toDelta <br>
*/
@Test
public void testEmptyClientAsFeederToServer() {
Integer PORT1 = ((Integer) server
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE)))
.intValue();
server2
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.TRUE, Boolean.FALSE));
/*
* server2.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate( putDelta[0],
* (Integer)putDelta[2] ));
*/
client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1));
client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2)));
server.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter());
server.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0)));
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter());
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0)));
}
/**
* This test does the following for single key:<br>
* 1)Verifies that from delta should not called on server with empty data policy just by passed
* delta to data store<br>
*/
@Test
public void testEmptyServerAsFeederToPeer() {
Integer PORT1 = ((Integer) server
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE)))
.intValue();
server2
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.TRUE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1));
client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2)));
server.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaFeatureNotUsed());
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0)));
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter());
}
/**
* This test does verifies that server with empty data policy sends deltas to the client which can
* handle deltas. Server sends full values which can not handle deltas.
*/
@Test
public void testClientsConnectedToEmptyServer() {
Integer PORT1 = ((Integer) server
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE)))
.intValue();
server2
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.TRUE,
Boolean.TRUE, Boolean.FALSE));
client2.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.TRUE,
Boolean.FALSE, Boolean.FALSE));
int deltaSent =
(Integer) server2.invoke(() -> ClientToServerDeltaDUnitTest.putsWhichReturnsDeltaSent());
client.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey());
client.invoke(() -> ClientToServerDeltaDUnitTest.checkForDelta());
client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey());
client2.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaInvoked(new Integer(deltaSent)));
}
/**
* This test does the following for single key:<br>
* 1)Verifies that To delta called on client should be equal to fromDeltaCounter on datastore <br>
*/
@Test
public void testClientNonEmptyEmptyServerAsFeederToPeer() {
Integer PORT1 = ((Integer) server
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.TRUE)))
.intValue();
server2
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(Boolean.FALSE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.createClientCache(
NetworkUtils.getServerHostName(server.getHost()), new Integer(PORT1), Boolean.FALSE,
Boolean.FALSE, Boolean.FALSE));
client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(KEY1));
client.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(2)));
server.invoke(() -> ClientToServerDeltaDUnitTest.checkDeltaFeatureNotUsed());
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkTodeltaCounter(new Integer(0)));
server2.invoke(() -> ClientToServerDeltaDUnitTest.checkFromdeltaCounter());
}
/**
* This test has client1 connected to server1 and client2 connected to server2. client2 has a CQ
* registered on the server2. client1 creates a key on server1 which satisfies the CQ and it then
* updates the same key with a value so that it sould not satisfy the CQ. The cloning is set to
* false. The test ensures that the client2 gets the update event. This implies that server2
* clones the old value prior to applying delta received from server1 and then processes the CQ.
*/
@Test
public void testC2CDeltaPropagationWithCQ() throws Exception {
initialise(false/* clone */, new String[] {CQs[1]}, true/* CQ */, true/* RI */,
true/* enable delta */);
client.invoke(ClientToServerDeltaDUnitTest.class, "createKeys",
new Object[] {new String[] {KEY1}});
client.invoke(ClientToServerDeltaDUnitTest.class, "putDeltaForCQ", new Object[] {KEY1,
new Integer(1), new Integer[] {1}, new Boolean[] {false, true, false, false}});
client.invoke(() -> ClientToServerDeltaDUnitTest.putLastKeyWithDelta());
client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey());
client2.invoke(() -> ClientToServerDeltaDUnitTest.verifyDeltaReceived());
}
/**
* This test ensures that a server sends delta bytes to a client even if that client is not
* interested in that event but is getting the event only because the event satisfies a CQ which
* the client has registered with the server.
*/
@Test
public void testC2CDeltaPropagationWithCQWithoutRI() throws Exception {
initialise(false/* clone */, new String[] {CQs[1]}, true/* CQ */, false/* RI */,
true/* enable delta */);
client.invoke(ClientToServerDeltaDUnitTest.class, "createKeys",
new Object[] {new String[] {KEY1}});
client.invoke(ClientToServerDeltaDUnitTest.class, "putDeltaForCQ", new Object[] {KEY1,
new Integer(1), new Integer[] {1}, new Boolean[] {false, true, false, false}});
client.invoke(() -> ClientToServerDeltaDUnitTest.putLastKeyWithDelta());
client2.invoke(() -> ClientToServerDeltaDUnitTest.waitForLastKey());
server2.invoke(() -> ClientToServerDeltaDUnitTest.verifyDeltaSent(Integer.valueOf(1)));
}
@Test
public void testClientSendsFullValueToServerWhenDeltaOffAtServer() {
initialise2(false/* clone */, new String[] {CQs[1]}, false/* CQ */, true/* RI */,
false/* enable delta */);
// set expected value on server
server.invoke(() -> ClientToServerDeltaDUnitTest.setFirstSecondUpdate(putDelta[0],
(Integer) putDelta[2]));
client.invoke(() -> ClientToServerDeltaDUnitTest.putDelta(DELTA_KEY));
assertFalse("Delta Propagation feature used at client.",
(Boolean) client.invoke(() -> DeltaTestImpl.toDeltaFeatureUsed()));
assertFalse("Delta Propagation feature used at server.",
(Boolean) server.invoke(() -> DeltaTestImpl.fromDeltaFeatureUsed()));
assertFalse(
"Failures at client while calculating delta. But delta-propagation is false at server.",
((Boolean) client.invoke(() -> DeltaTestImpl.isToDeltaFailure())).booleanValue());
assertFalse(
"Failures at server while applying delta. But delta-propagation is false at server.",
(Boolean) server.invoke(() -> DeltaTestImpl.isFromDeltaFailure()));
}
@Test
public void testC2SDeltaPropagationWithOldValueInvalidatedAtServer() throws Exception {
String key = "DELTA_KEY";
Integer port1 = (Integer) server
.invoke(() -> ClientToServerDeltaDUnitTest.createServerCache(false, false, false, true));
createClientCache("localhost", port1, false, false, false, null, false, false);
LocalRegion region = (LocalRegion) cache.getRegion(REGION_NAME);
region.put(key, new DeltaTestImpl());
server.invoke(() -> ClientToServerDeltaDUnitTest.doInvalidate(key));
DeltaTestImpl value = new DeltaTestImpl();
value.setStr("UPDATED_VALUE");
region.put(key, value);
assertTrue(region.getCachePerfStats().getDeltasSent() == 1);
assertTrue(region.getCachePerfStats().getDeltaFullValuesSent() == 1);
}
public static void doInvalidate(String key) {
Region region = cache.getRegion(REGION_NAME);
region.invalidate(key);
}
public static void verifyDeltaReceived() {
assertTrue(
"Expected 1 fromDelta() invokation but was " + DeltaTestImpl.getFromDeltaInvokations(),
DeltaTestImpl.getFromDeltaInvokations() == 1);
}
public static void verifyDeltaSent(Integer deltas) {
CacheClientNotifier ccn = ((CacheServerImpl) cache.getCacheServers().toArray()[0]).getAcceptor()
.getCacheClientNotifier();
int numOfDeltasSent = ((CacheClientProxy) ccn.getClientProxies().toArray()[0]).getStatistics()
.getDeltaMessagesSent();
assertTrue("Expected " + deltas + " deltas to be sent but " + numOfDeltasSent + " were sent.",
numOfDeltasSent == deltas);
}
public static void checkFromdeltaCounter() {
assertTrue(
"FromDelta counters do not match, expected: " + (NO_PUT_OPERATION - 1) + ", actual: "
+ DeltaTestImpl.getFromDeltaInvokations(),
DeltaTestImpl.getFromDeltaInvokations() >= (NO_PUT_OPERATION - 1));
}
public static void checkTodeltaCounter(Integer count) {
assertTrue(
"ToDelta counters do not match, expected: " + count.intValue() + ", actual: "
+ DeltaTestImpl.getToDeltaInvokations(),
DeltaTestImpl.getToDeltaInvokations() >= count.intValue());
}
public static void checkDeltaFeatureNotUsed() {
assertTrue("Delta Propagation feature used.", !(DeltaTestImpl.deltaFeatureUsed()));
}
public static void createKeys(String[] keys) {
Region region = cache.getRegion(REGION_NAME);
for (int i = 0; i < keys.length; i++) {
region.create(keys[i], new DeltaTestImpl());
}
}
public static int putsWhichReturnsDeltaSent() throws Exception {
csDelta = new CSDeltaTestImpl();
region.put(DELTA_KEY, csDelta);
for (int i = 0; i < NO_PUT_OPERATION; i++) {
csDelta.setIntVar(i);
region.put(DELTA_KEY, csDelta);
}
region.put(LAST_KEY, "");
return csDelta.deltaSent;
}
public static void checkDeltaInvoked(Integer deltaSent) {
assertTrue(
"Delta applied :" + ((CSDeltaTestImpl) region.get(DELTA_KEY)).getDeltaApplied()
+ "\n Delta sent :" + deltaSent,
((CSDeltaTestImpl) region.get(DELTA_KEY)).getDeltaApplied() == deltaSent);
}
public static void checkForDelta() {
assertTrue("Delta sent to EMPTY data policy region",
DeltaTestImpl.getFromDeltaInvokations() == 0);
}
public static void waitForLastKey() {
WaitCriterion wc = new WaitCriterion() {
@Override
public boolean done() {
return ClientToServerDeltaDUnitTest.lastKeyReceived;
}
@Override
public String description() {
return "Last key NOT received.";
}
};
GeodeAwaitility.await().untilAsserted(wc);
}
static class CSDeltaTestImpl extends DeltaTestImpl {
int deltaSent = 0;
int deltaApplied = 0;
public CSDeltaTestImpl() {}
@Override
public void toDelta(DataOutput out) throws IOException {
super.toDelta(out);
deltaSent++;
}
@Override
public void fromDelta(DataInput in) throws IOException {
super.fromDelta(in);
deltaApplied++;
}
public int getDeltaSent() {
return deltaSent;
}
public int getDeltaApplied() {
return deltaApplied;
}
@Override
public String toString() {
return "CSDeltaTestImpl[deltaApplied=" + deltaApplied + "]" + super.toString();
}
}
}