blob: 193aa70f0b30841a05be7537df3e876da80754c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.functional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.text.ParseException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.query.CacheUtils;
import org.apache.geode.cache.query.Index;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.Struct;
import org.apache.geode.cache.query.internal.QueryObserverAdapter;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.test.junit.categories.OQLIndexTest;
@Category({OQLIndexTest.class})
public class IndexOnEntrySetJUnitTest {
private static String testRegionName = "regionName";
private static Region testRegion;
private static int numElem = 100;
private String newValue = "NEW VALUE";
@Before
public void setUp() throws Exception {
System.setProperty(DistributionConfig.GEMFIRE_PREFIX + "Query.VERBOSE", "true");
CacheUtils.startCache();
}
@After
public void tearDown() throws Exception {
// Destroy current Region for other tests
IndexManager.testHook = null;
if (testRegion != null) {
testRegion.destroyRegion();
}
CacheUtils.closeCache();
}
private String[] getQueriesOnRegion(String regionName) {
return new String[] {
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.key.PartitionID > 0 AND "
+ "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
"SELECT DISTINCT * FROM /" + regionName
+ ".entrySet entry WHERE entry.key.PartitionID > 0 AND "
+ "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.key.PartitionID > 0 AND "
+ "entry.key.Index > 1 LIMIT 2",
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.key.PartitionID > 0 AND "
+ "entry.key.Index > 1 ORDER BY entry.key.Index ASC",};
}
private String[] getQueriesOnRegionForPut(String regionName) {
return new String[] {
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.key.PartitionID = 50 AND "
+ "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2",
"SELECT DISTINCT entry.value, entry.key FROM /" + regionName
+ ".entrySet entry WHERE entry.value = 50 AND "
+ "entry.key.Index > 1 ORDER BY entry.key.Index ASC LIMIT 2"};
}
/**
* Test queries with index on replicated regions and concurrent PUT, DESTORY, INVALIDATE
* operations. Make sure there is no UNDEFINED in the query result.
*/
@Test
public void testQueriesOnReplicatedRegion() throws Exception {
testRegion = createReplicatedRegion(testRegionName);
String regionPath = "/" + testRegionName + ".entrySet entry";
executeQueryTest(getQueriesOnRegion(testRegionName), "entry.key.Index", regionPath, 200);
}
@Test
public void testEntryDestroyedRaceWithSizeEstimateReplicatedRegion() throws Exception {
testRegion = createReplicatedRegion(testRegionName);
String regionPath = "/" + testRegionName + ".entrySet entry";
executeQueryTestDestroyDuringSizeEstimation(getQueriesOnRegion(testRegionName),
"entry.key.Index", regionPath, 201);
}
/**
* Test queries with index on partitioned regions and concurrent PUT, DESTORY, INVALIDATE
* operations. Make sure there is no UNDEFINED in the query result.
*/
@Test
public void testQueriesOnPartitionedRegion() throws Exception {
testRegion = createPartitionedRegion(testRegionName);
String regionPath = "/" + testRegionName + ".entrySet entry";
executeQueryTest(getQueriesOnRegion(testRegionName), "entry.key.Index", regionPath, 200);
}
private Region createReplicatedRegion(String regionName) throws ParseException {
Cache cache = CacheUtils.getCache();
AttributesFactory attributesFactory = new AttributesFactory();
attributesFactory.setDataPolicy(DataPolicy.REPLICATE);
RegionAttributes regionAttributes = attributesFactory.create();
return cache.createRegion(regionName, regionAttributes);
}
private Region createPartitionedRegion(String regionName) throws ParseException {
Cache cache = CacheUtils.getCache();
PartitionAttributesFactory prAttFactory = new PartitionAttributesFactory();
AttributesFactory attributesFactory = new AttributesFactory();
attributesFactory.setPartitionAttributes(prAttFactory.create());
RegionAttributes regionAttributes = attributesFactory.create();
return cache.createRegion(regionName, regionAttributes);
}
private void populateRegion(Region region) throws Exception {
for (int i = 1; i <= numElem; i++) {
putData(i, region);
}
}
private void putData(int id, Region region) throws ParseException {
region.put(new SomeKey(id, id), id);
}
private void clearData(Region region) {
Iterator it = region.entrySet().iterator();
while (it.hasNext()) {
Region.Entry entry = (Region.Entry) it.next();
region.destroy(entry.getKey());
}
}
/****
* Query Execution Helpers
****/
private void executeQueryTest(String[] queries, String indexedExpression, String regionPath,
int testHookSpot) throws Exception {
Cache cache = CacheUtils.getCache();
boolean[] booleanVals = {true, false};
for (String query : queries) {
for (boolean isDestroy : booleanVals) {
clearData(testRegion);
populateRegion(testRegion);
assertNotNull(cache.getRegion(testRegionName));
assertEquals(numElem, cache.getRegion(testRegionName).size());
if (isDestroy) {
helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath,
new DestroyEntryTestHook(testRegion, testHookSpot), 1);
} else {
helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath,
new InvalidateEntryTestHook(testRegion, testHookSpot), 1);
}
}
}
queries = getQueriesOnRegionForPut(testRegionName);
for (String query : queries) {
clearData(testRegion);
populateRegion(testRegion);
assertNotNull(cache.getRegion(testRegionName));
assertEquals(numElem, cache.getRegion(testRegionName).size());
helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath,
new PutEntryTestHook(testRegion, testHookSpot), 1);
}
}
/**
* helper method to test against a functional index make sure there is no UNDEFINED result
*/
private SelectResults helpTestFunctionalIndexForQuery(String query, String indexedExpression,
String regionPath, AbstractTestHook testHook, int expectedSize) throws Exception {
MyQueryObserverAdapter observer = new MyQueryObserverAdapter();
QueryObserverHolder.setInstance(observer);
IndexManager.testHook = testHook;
QueryService qs = CacheUtils.getQueryService();
Index index = qs.createIndex("testIndex", indexedExpression, regionPath);
SelectResults indexedResults = (SelectResults) qs.newQuery(query).execute();
Iterator iterator = indexedResults.iterator();
while (iterator.hasNext()) {
Object row = iterator.next();
if (row instanceof Struct) {
Object[] fields = ((Struct) row).getFieldValues();
for (Object field : fields) {
assertTrue(field != QueryService.UNDEFINED);
if (field instanceof String) {
assertTrue(((String) field).compareTo(newValue) != 0);
}
}
} else {
assertTrue(row != QueryService.UNDEFINED);
if (row instanceof String) {
assertTrue(((String) row).compareTo(newValue) != 0);
}
}
}
assertTrue(indexedResults.size() >= expectedSize);
assertTrue(observer.indexUsed);
assertTrue(((AbstractTestHook) IndexManager.testHook).isTestHookCalled());
((AbstractTestHook) IndexManager.testHook).reset();
qs.removeIndex(index);
return indexedResults;
}
private void executeQueryTestDestroyDuringSizeEstimation(String[] queries,
String indexedExpression, String regionPath, int testHookSpot) throws Exception {
Cache cache = CacheUtils.getCache();
for (String query : queries) {
clearData(testRegion);
populateRegion(testRegion);
assertNotNull(cache.getRegion(testRegionName));
assertEquals(numElem, cache.getRegion(testRegionName).size());
helpTestFunctionalIndexForQuery(query, indexedExpression, regionPath,
new DestroyEntryTestHook(testRegion, testHookSpot), 0);
}
}
class MyQueryObserverAdapter extends QueryObserverAdapter {
public boolean indexUsed = false;
@Override
public void afterIndexLookup(Collection results) {
super.afterIndexLookup(results);
indexUsed = true;
}
}
class SomeKey {
public int Index = 1;
public int PartitionID = 1;
public SomeKey(int index, int partitionId) {
this.Index = index;
this.PartitionID = partitionId;
}
public boolean equals(Object other) {
if (other instanceof SomeKey) {
SomeKey otherKey = (SomeKey) other;
return this.Index == otherKey.Index && this.PartitionID == otherKey.PartitionID;
}
return false;
}
public String toString() {
return "somekey:" + Index + "," + PartitionID;
}
}
/**
* Test hook
*/
abstract class AbstractTestHook implements IndexManager.TestHook {
boolean isTestHookCalled = false;
Region r;
private int testHookSpot;
public AbstractTestHook(int testHookSpot) {
this.testHookSpot = testHookSpot;
}
public void reset() {
isTestHookCalled = false;
}
public boolean isTestHookCalled() {
return isTestHookCalled;
}
/**
* Subclass override with different operation
*/
public abstract void doOp();
@Override
public void hook(int spot) {
if (spot == testHookSpot) {
if (!isTestHookCalled) {
isTestHookCalled = true;
CompletableFuture.runAsync(this::doOp).join();
}
}
}
}
class DestroyEntryTestHook extends AbstractTestHook {
DestroyEntryTestHook(Region r, int testHookSpot) {
super(testHookSpot);
this.r = r;
}
@Override
public void doOp() {
Iterator it = r.entrySet().iterator();
while (it.hasNext()) {
Region.Entry entry = (Region.Entry) it.next();
r.destroy(entry.getKey());
}
}
}
class InvalidateEntryTestHook extends AbstractTestHook {
InvalidateEntryTestHook(Region r, int testHookSpot) {
super(testHookSpot);
this.r = r;
}
@Override
public void doOp() {
Iterator it = r.entrySet().iterator();
while (it.hasNext()) {
Region.Entry entry = (Region.Entry) it.next();
r.invalidate(entry.getKey());
}
}
}
class PutEntryTestHook extends AbstractTestHook {
PutEntryTestHook(Region r, int testHookSpot) {
super(testHookSpot);
this.r = r;
}
@Override
public void doOp() {
Iterator it = r.entrySet().iterator();
while (it.hasNext()) {
Region.Entry entry = (Region.Entry) it.next();
r.put(entry.getKey(), newValue);
}
}
}
}