blob: 4dfa0fb66770a2cb7682a2e34f45e44974c9ba0e [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
/**
*
*/
package com.gemstone.gemfire.cache.query.dunit;
import java.util.Properties;
import com.gemstone.gemfire.cache.Cache;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.PartitionAttributesFactory;
import com.gemstone.gemfire.cache.PartitionResolver;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionShortcut;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
import com.gemstone.gemfire.cache.query.Index;
import com.gemstone.gemfire.cache.query.IndexType;
import com.gemstone.gemfire.cache.query.QueryService;
import com.gemstone.gemfire.cache.query.SelectResults;
import com.gemstone.gemfire.cache.query.data.Portfolio;
import com.gemstone.gemfire.cache.query.data.Position;
import com.gemstone.gemfire.cache.query.internal.QueryObserverHolder;
import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
import com.gemstone.gemfire.cache.query.partitioned.PRQueryDUnitHelper;
import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
import com.gemstone.gemfire.cache30.CacheTestCase;
import com.gemstone.gemfire.internal.cache.execute.PRClientServerTestBase;
import dunit.AsyncInvocation;
import dunit.DistributedTestCase;
import dunit.Host;
import dunit.SerializableRunnable;
import dunit.VM;
/**
* This tests the data inconsistency during update on an index and querying the
* same UNLOCKED index.
*
* @author shobhit
*
*/
public class QueryDataInconsistencyDUnitTest extends CacheTestCase {
private static final int cnt = 0;
private static final int cntDest = 10;
static VM server = null;
static VM client = null;
static Cache cache = null;
static String PartitionedRegionName1 = "TestPartitionedRegion1"; // default
// name
static String repRegionName = "TestRepRegion"; // default name
static Integer serverPort1 = null;
public static int numOfBuckets = 20;
public static String[] queries = new String[] {
"select * from /" + PartitionedRegionName1 + " where ID=1",
};
public static String[] queriesForRR = new String[] { "<trace> select * from /"
+ repRegionName + " where ID=1" };
private static PRQueryDUnitHelper PRQHelp = new PRQueryDUnitHelper("");
public static volatile boolean hooked = false;
/**
* @param name
*/
public QueryDataInconsistencyDUnitTest(String name) {
super(name);
}
@Override
public void tearDown2() throws Exception {
invokeInEveryVM(CacheTestCase.class, "disconnectFromDS");
super.tearDown2();
invokeInEveryVM(QueryObserverHolder.class, "reset");
}
@Override
public void setUp() throws Exception {
super.setUp();
Host host = Host.getHost(0);
server = host.getVM(0);
}
public void testCompactRangeIndex() {
// Create caches
Properties props = new Properties();
server.invoke(PRClientServerTestBase.class, "createCacheInVm",
new Object[] { props });
server.invoke(new CacheSerializableRunnable("create indexes") {
@Override
public void run2() throws CacheException {
cache = CacheFactory.getAnyInstance();
Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
// Create common Portflios and NewPortfolios
for (int j = cnt; j < cntDest; j++) {
region.put(new Integer(j), new Portfolio(j));
}
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
try {
Index index = qs.createIndex("idIndex", "ID", "/"+repRegionName);
assertEquals(10, index.getStatistics().getNumberOfKeys());
} catch (Exception e) {
fail("Index creation failed");
}
}
});
//Invoke update from client and stop in updateIndex
//first before updating the RegionEntry and second after updating
//the RegionEntry.
AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
@Override
public void run2() throws CacheException {
Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
repRegion.put(new Integer("1"), new Portfolio(cntDest+1));
//above call must be hooked in BEFORE_UPDATE_OP call.
}
});
server.invoke(new CacheSerializableRunnable("query on server") {
@Override
public void run2() throws CacheException {
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
while (!hooked){pause(100);}
Object rs = null;
try {
rs = qs.newQuery("<trace> select * from /"+repRegionName+" where ID = 1").execute();
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.");
IndexManager.testHook = null;
}
assertTrue(rs instanceof SelectResults);
assertEquals(1, ((SelectResults)rs).size());
Portfolio p1 = (Portfolio) ((SelectResults)rs).asList().get(0);
if (p1.getID() != 1) {
fail("Query thread did not verify index results even when RE is under update");
IndexManager.testHook = null;
}
hooked = false;//Let client put go further.
}
});
//Client put is again hooked in AFTER_UPDATE_OP call in updateIndex.
server.invoke(new CacheSerializableRunnable("query on server") {
@Override
public void run2() throws CacheException {
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
while (!hooked){pause(100);}
Object rs = null;
try {
rs = qs.newQuery("<trace> select * from /"+repRegionName+" where ID = 1").execute();
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server."+e.getMessage());
} finally {
IndexManager.testHook = null;
}
assertTrue(rs instanceof SelectResults);
if (((SelectResults)rs).size() > 0) {
Portfolio p1 = (Portfolio) ((SelectResults)rs).iterator().next();
if (p1.getID() != 1) {
fail("Query thread did not verify index results even when RE is under update and " +
"RegionEntry value has been modified before releasing the lock");
IndexManager.testHook = null;
}
}
hooked = false;//Let client put go further.
}
});
DistributedTestCase.join(putThread, 200, this.getLogWriter());
}
public void testRangeIndex() {
// Create caches
Properties props = new Properties();
server.invoke(PRClientServerTestBase.class, "createCacheInVm",
new Object[] { props });
server.invoke(new CacheSerializableRunnable("create indexes") {
@Override
public void run2() throws CacheException {
cache = CacheFactory.getAnyInstance();
Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
IndexManager.testHook = null;
// Create common Portfolios and NewPortfolios
Position.cnt = 0;
for (int j = cnt; j < cntDest; j++) {
Portfolio p = new Portfolio(j);
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: portfolio "+j+ " : "+p);
region.put(new Integer(j), p);
}
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
try {
Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.positions.values pos");
assertEquals(12, index.getStatistics().getNumberOfKeys());
} catch (Exception e) {
fail("Index creation failed");
}
}
});
//Invoke update from client and stop in updateIndex
//first before updating the RegionEntry and second after updating
//the RegionEntry.
AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
@Override
public void run2() throws CacheException {
Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
Portfolio newPort = new Portfolio(cntDest+1);
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: New Portfolio"+newPort);
repRegion.put(new Integer("1"), newPort);
//above call must be hooked in BEFORE_UPDATE_OP call.
}
});
server.invoke(new CacheSerializableRunnable("query on server") {
@Override
public void run2() throws CacheException {
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
Position pos1 = null;
while (!hooked){pause(100);}
try {
Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
assertTrue(rs instanceof SelectResults);
pos1 = (Position) ((SelectResults)rs).iterator().next();
if (!pos1.secId.equals("APPL")) {
fail("Query thread did not verify index results even when RE is under update");
IndexManager.testHook = null;
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.", e);
IndexManager.testHook = null;
} finally {
hooked = false;//Let client put go further.
}
while (!hooked) {
pause(100);
}
try {
Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
assertTrue(rs instanceof SelectResults);
if (((SelectResults)rs).size() > 0) {
Position pos2 = (Position) ((SelectResults)rs).iterator().next();
if (pos2.equals(pos1)) {
fail("Query thread did not verify index results even when RE is under update and " +
"RegionEntry value has been modified before releasing the lock");
}
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.");
} finally {
hooked = false;//Let client put go further.
IndexManager.testHook = null;
}
}
});
DistributedTestCase.join(putThread, 200, this.getLogWriter());
}
public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch() {
// Create caches
Properties props = new Properties();
server.invoke(PRClientServerTestBase.class, "createCacheInVm",
new Object[] { props });
server.invoke(new CacheSerializableRunnable("create indexes") {
@Override
public void run2() throws CacheException {
cache = CacheFactory.getAnyInstance();
Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
IndexManager.testHook = null;
// Create common Portfolios and NewPortfolios
Position.cnt = 0;
for (int j = cnt; j < cntDest; j++) {
region.put(new Integer(j), new Portfolio(j));
}
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
try {
Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos");
assertEquals(12, index.getStatistics().getNumberOfKeys());
} catch (Exception e) {
fail("Index creation failed");
}
}
});
//Invoke update from client and stop in updateIndex
//first before updating the RegionEntry and second after updating
//the RegionEntry.
AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
@Override
public void run2() throws CacheException {
Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
// This portfolio with same ID must have different positions.
repRegion.put(new Integer("1"), new Portfolio(1));
//above call must be hooked in BEFORE_UPDATE_OP call.
}
});
server.invoke(new CacheSerializableRunnable("query on server") {
@Override
public void run2() throws CacheException {
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
Position pos1 = null;
while (!hooked){pause(100);}
try {
Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
assertTrue(rs instanceof SelectResults);
pos1 = (Position) ((SelectResults)rs).iterator().next();
if (!pos1.secId.equals("APPL")) {
fail("Query thread did not verify index results even when RE is under update");
IndexManager.testHook = null;
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.", e);
IndexManager.testHook = null;
} finally {
hooked = false;//Let client put go further.
}
while (!hooked) {
pause(100);
}
try {
Object rs = qs.newQuery("select pos from /"+repRegionName+" p, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
assertTrue(rs instanceof SelectResults);
if (((SelectResults)rs).size() > 0) {
Position pos2 = (Position) ((SelectResults)rs).iterator().next();
if (pos2.equals(pos1)) {
fail("Query thread did not verify index results even when RE is under update and " +
"RegionEntry value has been modified before releasing the lock");
}
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.");
} finally {
hooked = false;//Let client put go further.
IndexManager.testHook = null;
}
}
});
DistributedTestCase.join(putThread, 200, this.getLogWriter());
}
public void testRangeIndexWithIndexAndQueryFromCluaseMisMatch2() {
// Create caches
Properties props = new Properties();
server.invoke(PRClientServerTestBase.class, "createCacheInVm",
new Object[] { props });
server.invoke(new CacheSerializableRunnable("create indexes") {
@Override
public void run2() throws CacheException {
cache = CacheFactory.getAnyInstance();
Region region = cache.createRegionFactory(RegionShortcut.REPLICATE).create(repRegionName);
IndexManager.testHook = null;
// Create common Portfolios and NewPortfolios
Position.cnt = 0;
for (int j = cnt; j < cntDest; j++) {
region.put(new Integer(j), new Portfolio(j));
}
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
try {
Index index = qs.createIndex("posIndex", "pos.secId", "/"+repRegionName+" p, p.positions.values pos");
assertEquals(12, index.getStatistics().getNumberOfKeys());
} catch (Exception e) {
fail("Index creation failed");
}
}
});
//Invoke update from client and stop in updateIndex
//first before updating the RegionEntry and second after updating
//the RegionEntry.
AsyncInvocation putThread = server.invokeAsync(new CacheSerializableRunnable("update a Region Entry") {
@Override
public void run2() throws CacheException {
Region repRegion = CacheFactory.getAnyInstance().getRegion(repRegionName);
IndexManager.testHook = new IndexManagerTestHook();
// This portfolio with same ID must have different positions.
repRegion.put(new Integer("1"), new Portfolio(1));
//above call must be hooked in BEFORE_UPDATE_OP call.
}
});
server.invoke(new CacheSerializableRunnable("query on server") {
@Override
public void run2() throws CacheException {
QueryService qs = CacheFactory.getAnyInstance().getQueryService();
Position pos1 = null;
while (!hooked){pause(100);}
try {
Object rs = qs.newQuery("<trace> select pos from /"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
CacheFactory.getAnyInstance().getLogger().fine("Shobhit: "+rs);
assertTrue(rs instanceof SelectResults);
pos1 = (Position) ((SelectResults)rs).iterator().next();
if (!pos1.secId.equals("APPL")) {
fail("Query thread did not verify index results even when RE is under update");
IndexManager.testHook = null;
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.", e);
IndexManager.testHook = null;
} finally {
hooked = false;//Let client put go further.
}
while (!hooked) {
pause(100);
}
try {
Object rs = qs.newQuery("select pos from /"+repRegionName+" p, p.collectionHolderMap.values coll, p.positions.values pos where pos.secId = 'APPL' AND p.ID = 1").execute();
assertTrue(rs instanceof SelectResults);
if (((SelectResults)rs).size() > 0) {
Position pos2 = (Position) ((SelectResults)rs).iterator().next();
if (pos2.equals(pos1)) {
fail("Query thread did not verify index results even when RE is under update and " +
"RegionEntry value has been modified before releasing the lock");
}
}
} catch (Exception e) {
e.printStackTrace();
fail("Query execution failed on server.");
} finally {
IndexManager.testHook = null;
hooked = false;//Let client put go further.
}
}
});
DistributedTestCase.join(putThread, 200, this.getLogWriter());
}
public static void createProxyRegions() {
new QueryDataInconsistencyDUnitTest("temp").createProxyRegs();
}
private void createProxyRegs() {
ClientCache cache = (ClientCache) CacheFactory.getAnyInstance();
cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(
repRegionName);
/*cache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(
PartitionedRegionName1);*/
}
public static void createNewPR() {
new QueryDataInconsistencyDUnitTest("temp").createPR();
}
public void createPR() {
PartitionResolver testKeyBasedResolver = new QueryAPITestPartitionResolver();
cache = CacheFactory.getAnyInstance();
cache
.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
.setPartitionAttributes(
new PartitionAttributesFactory()
.setTotalNumBuckets(numOfBuckets)
.setPartitionResolver(testKeyBasedResolver)
.create())
.create(PartitionedRegionName1);
}
public static void createCacheClientWithoutRegion(String host, Integer port1) {
new QueryDataInconsistencyDUnitTest("temp").createCacheClientWithoutReg(
host, port1);
}
private void createCacheClientWithoutReg(String host, Integer port1) {
this.disconnectFromDS();
ClientCache cache = new ClientCacheFactory().addPoolServer(host, port1)
.create();
}
/**
* This function puts portfolio objects into the created Region (PR or Local)
* *
*
* @param regionName
* @param portfolio
* @param to
* @param from
* @return cacheSerializable object
*/
public CacheSerializableRunnable getCacheSerializableRunnableForPRPuts(
final String regionName, final Object[] portfolio, final int from,
final int to) {
SerializableRunnable puts = new CacheSerializableRunnable("Region Puts") {
@Override
public void run2() throws CacheException {
Cache cache = CacheFactory.getAnyInstance();
Region region = cache.getRegion(repRegionName);
for (int j = from; j < to; j++)
region.put(new Integer(j), portfolio[j]);
getLogWriter()
.info(
"PRQueryDUnitHelper#getCacheSerializableRunnableForPRPuts: Inserted Portfolio data on Region "
+ regionName);
}
};
return (CacheSerializableRunnable) puts;
}
public class IndexManagerTestHook implements com.gemstone.gemfire.cache.query.internal.index.IndexManager.TestHook{
public void hook(final int spot) throws RuntimeException {
switch (spot) {
case 9: //Before Index update and after region entry lock.
hooked = true;
getLogWriter().info("QueryDataInconsistency.IndexManagerTestHook is hooked in Update Index Entry.");
while(hooked) {
pause(100);
}
assertEquals(hooked, false);
break;
case 10: //Before Region update and after Index Remove call.
hooked = true;
getLogWriter().info("QueryDataInconsistency.IndexManagerTestHook is hooked in Remove Index Entry.");
while(hooked) {
pause(100);
}
assertEquals(hooked, false);
break;
default:
break;
}
}
}
}