blob: 1c4299df5bd34c6fa0b6b2f59a462d192659a748 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal.index;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.test.dunit.Assert.fail;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.CommitConflictException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.ClientCache;
import org.apache.geode.cache.client.ClientCacheFactory;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.QueryTestUtils;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.data.Position;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
import org.apache.geode.test.dunit.SerializableCallable;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.OQLIndexTest;
@Category({OQLIndexTest.class})
public class CopyOnReadIndexDUnitTest extends JUnit4CacheTestCase {
VM vm0;
VM vm1;
VM vm2;
public CopyOnReadIndexDUnitTest() {
super();
}
@Override
public final void postSetUp() throws Exception {
getSystem();
Invoke.invokeInEveryVM(new SerializableRunnable("getSystem") {
@Override
public void run() {
getSystem();
}
});
Host host = Host.getHost(0);
vm0 = host.getVM(0);
vm1 = host.getVM(1);
vm2 = host.getVM(2);
}
@Override
public final void preTearDownCacheTestCase() throws Exception {
disconnectAllFromDS();
}
// test different queries against partitioned region
@Test
public void testPRQueryOnLocalNode() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
configureServers();
helpTestPRQueryOnLocalNode(utils.queries.get("545"), 100, 100, true);
helpTestPRQueryOnLocalNode(utils.queries.get("546"), 100, 100, true);
helpTestPRQueryOnLocalNode(utils.queries.get("543"), 100, 100, true);
helpTestPRQueryOnLocalNode(utils.queries.get("544"), 100, 100, true);
helpTestPRQueryOnLocalNode("select * from /portfolios p where p.ID = 1", 100, 1, true);
helpTestPRQueryOnLocalNode(utils.queries.get("545"), 100, 100, false);
helpTestPRQueryOnLocalNode(utils.queries.get("546"), 100, 100, false);
helpTestPRQueryOnLocalNode(utils.queries.get("543"), 100, 100, false);
helpTestPRQueryOnLocalNode(utils.queries.get("544"), 100, 100, false);
helpTestPRQueryOnLocalNode("select * from /portfolios p where p.ID = 1", 100, 1, false);
}
// tests different queries with a transaction for replicated region
@Test
public void testTransactionsOnReplicatedRegion() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
configureServers();
helpTestTransactionsOnReplicatedRegion(utils.queries.get("545"), 100, 100, true);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("546"), 100, 100, true);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("543"), 100, 100, true);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("544"), 100, 100, true);
helpTestTransactionsOnReplicatedRegion("select * from /portfolios p where p.ID = 1", 100, 1,
true);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("545"), 100, 100, false);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("546"), 100, 100, false);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("543"), 100, 100, false);
helpTestTransactionsOnReplicatedRegion(utils.queries.get("544"), 100, 100, false);
helpTestTransactionsOnReplicatedRegion("select * from /portfolios p where p.ID = 1", 100, 1,
false);
}
private void configureServers() throws Exception {
final int[] port = AvailablePortHelper.getRandomAvailableTCPPorts(3);
startCacheServer(vm0, port[0]);
startCacheServer(vm1, port[1]);
startCacheServer(vm2, port[2]);
}
// The tests sets up a partition region across 2 servers
// It does puts in each server, checking instance counts of portfolio objects
// Querying the data will result in deserialization of portfolio objects.
// In cases where index is present, the objects will be deserialized in the cache
public void helpTestPRQueryOnLocalNode(final String queryString, final int numPortfolios,
final int numExpectedResults, final boolean hasIndex) throws Exception {
final int numPortfoliosPerVM = numPortfolios / 2;
resetInstanceCount(vm0);
resetInstanceCount(vm1);
createPartitionRegion(vm0, "portfolios");
createPartitionRegion(vm1, "portfolios");
if (hasIndex) {
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.createIndex("idIndex", "p.ID", "/portfolios p");
return null;
}
});
}
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
for (int i = 0; i < numPortfoliosPerVM; i++) {
Portfolio p = new Portfolio(i);
p.status = "testStatus";
p.positions = new HashMap();
p.positions.put("" + i, new Position("" + i, 20));
region.put("key " + i, p);
}
if (hasIndex) {
// operations we have done on this vm consist of:
// numPortfoliosPerVM instances of Portfolio created for put operation
// Due to index, we have deserialized all of the entries this vm currently host
Index index = getCache().getQueryService().getIndex(region, "idIndex");
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(
(int) index.getStatistics().getNumberOfValues() + numPortfoliosPerVM));
} else {
// operations we have done on this vm consist of:
// numPortfoliosPerVM instances of Portfolio created for put operation
// We do not have an index, so we have not deserialized any values
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(numPortfoliosPerVM));
}
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
for (int i = numPortfoliosPerVM; i < numPortfolios; i++) {
Portfolio p = new Portfolio(i);
p.status = "testStatus";
p.positions = new HashMap();
p.positions.put("" + i, new Position("" + i, 20));
region.put("key " + i, p);
}
// PR indexes are created across nodes unlike Replicated Region Indexes
if (hasIndex) {
// operations we have done on this vm consist of:
// numPortfoliosPerVM instances of Portfolio created for put operation
// Due to index, we have deserialized all of the entries this vm currently host
Index index = getCache().getQueryService().getIndex(region, "idIndex");
if (index == null) {
QueryTestUtils utils = new QueryTestUtils();
index = utils.createIndex("idIndex", "p.ID", "/portfolios p");
}
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(
(int) index.getStatistics().getNumberOfValues() + numPortfoliosPerVM));
} else {
// operations we have done on this vm consist of:
// numPortfoliosPerVM instances of Portfolio created for put operation
// We do not have an index, so we have not deserialized any values
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(numPortfoliosPerVM));
}
return null;
}
});
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
Iterator it = results.iterator();
assertEquals("Failed:" + queryString, numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
p.status = "discardStatus";
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
p.status = "discardStatus";
}
}
if (hasIndex) {
// operations we have done on this vm consist of:
// 50 instances of Portfolio created for put operation
// Due to index, we have deserialized all of the entries this vm currently host
// Since we have deserialized and cached these values, we just need to add the number of
// results we did a copy of due to copy on read
Index index = getCache().getQueryService().getIndex(region, "idIndex");
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount((int) index.getStatistics().getNumberOfValues()
+ numPortfoliosPerVM + numExpectedResults));
} else {
// operations we have done on this vm consist of:
// 50 instances of Portfolio created for put operation
// Due to the query we deserialized the number of entries this vm currently hosts
// We had to deserialized the results from the other data nodes when we iterated through
// the results as well as our own
GeodeAwaitility.await()
.untilAsserted(
verifyPortfolioCount((int) ((PartitionedRegion) region).getLocalSizeForTest()
+ numExpectedResults + numPortfoliosPerVM));
}
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
if (hasIndex) {
// After vm0 executed the query, we already had the values deserialized in our cache
// So it's the same total as before
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(
(int) ((PartitionedRegion) region).getLocalSizeForTest() + numPortfoliosPerVM));
} else {
// After vm0 executed the query, we had to deserialize the values in our vm
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(
(int) ((PartitionedRegion) region).getLocalSizeForTest() + numPortfoliosPerVM));
}
return null;
}
});
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
}
}
if (hasIndex) {
// operations we have done on this vm consist of:
// 50 instances of Portfolio created for put operation
// Due to index, we have deserialized all of the entries this vm currently host
// This is the second query, because we have deserialized and cached these values, we just
// need to add the number of results a second time
Index index = getCache().getQueryService().getIndex(region, "idIndex");
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount((int) index.getStatistics().getNumberOfValues()
+ numExpectedResults + numExpectedResults + numPortfoliosPerVM));
} else {
// operations we have done on this vm consist of:
// 50 instances of Portfolio created for put operation
// Due to index, we have deserialized all of the entries this vm currently host
// This is the second query, because we have deserialized and cached these values, we just
// need to add the number of results a second time
// Because we have no index, we have to again deserialize all the values that this vm is
// hosting
GeodeAwaitility.await()
.untilAsserted(
verifyPortfolioCount((int) (((PartitionedRegion) region).getLocalSizeForTest()
+ ((PartitionedRegion) region).getLocalSizeForTest() + numExpectedResults
+ numExpectedResults + numPortfoliosPerVM)));
}
return null;
}
});
destroyRegion("portfolio", vm0);
}
public void helpTestTransactionsOnReplicatedRegion(final String queryString,
final int numPortfolios, final int numExpectedResults, final boolean hasIndex)
throws Exception {
resetInstanceCount(vm0);
resetInstanceCount(vm1);
resetInstanceCount(vm2);
createReplicatedRegion(vm0, "portfolios");
createReplicatedRegion(vm1, "portfolios");
createReplicatedRegion(vm2, "portfolios");
// In the case of replicated region hasPR really has no effect on serialization/deserialization
// counts
if (hasIndex) {
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.createHashIndex("idIndex", "p.ID", "/portfolios p");
return null;
}
});
// let's not create index on vm1 to check different scenarios
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.createHashIndex("idIndex", "p.ID", "/portfolios p");
return null;
}
});
}
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
for (int i = 0; i < numPortfolios; i++) {
Portfolio p = new Portfolio(i);
p.status = "testStatus";
p.positions = new HashMap();
p.positions.put("" + i, new Position("" + i, 20));
region.put("key " + i, p);
}
// We should have the same number of portfolio objects that we created for the put
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(numPortfolios));
return null;
}
});
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
// At this point, we should only have serialized values in this vm
Region region = getCache().getRegion("/portfolios");
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(0));
return null;
}
});
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
// There is an index for vm2, so we should have deserialized values at this point,
Region region = getCache().getRegion("/portfolios");
if (hasIndex) {
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(numPortfolios));
} else {
GeodeAwaitility.await().untilAsserted(verifyPortfolioCount(0));
}
return null;
}
});
// start transaction
// execute query
// modify results
// check instance count
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
CacheTransactionManager txManager = region.getCache().getCacheTransactionManager();
try {
txManager.begin();
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
p.status = "discardStatus";
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
p.status = "discardStatus";
}
}
txManager.commit();
} catch (CommitConflictException conflict) {
fail("commit conflict exception", conflict);
}
// We have created puts from our previous callable
// Now we have copied the results from the query
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numExpectedResults + numPortfolios));
return null;
}
});
// Check objects in cache on vm1
vm1.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
p.status = "discardStatus";
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
p.status = "discardStatus";
}
}
// first it must deserialize the portfolios in the replicated region
// then we do a copy on read of these deserialized objects for the final result set
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numExpectedResults + numPortfolios));
results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
}
}
// we never created index on vm1
// so in this case, we always have to deserialize the value from the region
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numPortfolios * 2 + numExpectedResults * 2));
return null;
}
});
// Check objects in cache on vm2
vm2.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
p.status = "discardStatus";
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
p.status = "discardStatus";
}
}
// with or without index, the values had to have been deserialized at one point
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numPortfolios + numExpectedResults));
results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
}
}
if (hasIndex) {
// we have an index, so the values are already deserialized
// total is now our original deserialization amount : numPortfolios
// two query results copied.
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numPortfolios + numExpectedResults * 2));
} else {
// we never created index on vm1
// so in this case, we always have to deserialize the value from the region
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numPortfolios * 2 + numExpectedResults * 2));
}
return null;
}
});
// Check objects in cache on vm0
vm0.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
Region region = getCache().getRegion("/portfolios");
QueryService qs = getCache().getQueryService();
Query query = qs.newQuery(queryString);
SelectResults results = (SelectResults) query.execute();
assertEquals(numExpectedResults, results.size());
for (Object o : results) {
if (o instanceof Portfolio) {
Portfolio p = (Portfolio) o;
assertEquals("status should not have been changed", "testStatus", p.status);
} else {
Struct struct = (Struct) o;
Portfolio p = (Portfolio) struct.getFieldValues()[0];
assertEquals("status should not have been changed", "testStatus", p.status);
}
}
// with or without index, the values we put in the region were already deserialized values
GeodeAwaitility.await()
.untilAsserted(verifyPortfolioCount(numExpectedResults * 2 + numPortfolios));
return null;
}
});
destroyRegion("portfolio", vm0);
}
private void destroyRegion(String regionName, VM... vms) {
for (VM vm : vms) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.getCache().getQueryService().removeIndexes();
Region region = getCache().getRegion("/portfolios");
region.destroyRegion();
return null;
}
});
}
}
private void createPartitionRegion(VM vm, String regionName) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.createPartitionRegion("portfolios", Portfolio.class);
return null;
}
});
}
private void createReplicatedRegion(VM vm, String regionName) {
vm.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
QueryTestUtils utils = new QueryTestUtils();
utils.createReplicateRegion("portfolios");
return null;
}
});
}
private void resetInstanceCount(VM vm) {
vm.invoke(new SerializableRunnable() {
@Override
public void run() {
Portfolio.instanceCount.set(0);
}
});
}
private void startCacheServer(VM server, final int port) throws Exception {
server.invoke(new SerializableCallable() {
@Override
public Object call() throws Exception {
getSystem(getServerProperties());
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
cache.setCopyOnRead(true);
AttributesFactory factory = new AttributesFactory();
CacheServer cacheServer = getCache().addCacheServer();
cacheServer.setPort(port);
cacheServer.start();
QueryTestUtils.setCache(cache);
return null;
}
});
}
private void startClient(VM client, final VM server, final int port) {
client.invoke(new CacheSerializableRunnable("Start client") {
@Override
public void run2() throws CacheException {
Properties props = getClientProps();
getSystem(props);
final ClientCacheFactory ccf = new ClientCacheFactory(props);
ccf.addPoolServer(NetworkUtils.getServerHostName(server.getHost()), port);
ccf.setPoolSubscriptionEnabled(true);
ClientCache cache = (ClientCache) getClientCache(ccf);
}
});
}
protected Properties getClientProps() {
Properties p = new Properties();
p.setProperty(MCAST_PORT, "0");
p.setProperty(LOCATORS, "");
return p;
}
protected Properties getServerProperties() {
Properties p = new Properties();
p.setProperty(LOCATORS, "localhost[" + DistributedTestUtils.getDUnitLocatorPort() + "]");
return p;
}
private WaitCriterion verifyPortfolioCount(final int expected) {
return new WaitCriterion() {
private int expectedCount = expected;
@Override
public boolean done() {
return expectedCount == Portfolio.instanceCount.get();
}
@Override
public String description() {
return "verifying number of object instances created";
}
};
}
}