blob: 347a7ac32d6f792e2073b705109f4c66995b8d19 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.internal.aggregate;
import static org.apache.geode.cache.query.internal.aggregate.AbstractAggregator.downCast;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import junitparams.Parameters;
import junitparams.naming.TestCaseName;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.query.Query;
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.data.PortfolioPdx;
import org.apache.geode.test.junit.categories.OQLQueryTest;
@Category(OQLQueryTest.class)
public class SumIntegrationTest extends AggregateFunctionQueryBaseIntegrationTest {
private Map<String, Number> queries = new HashMap<>();
private Map<String, Number> equiJoinQueries = new HashMap<>();
private void prepareStructures() {
Supplier<Stream<Portfolio>> supplierOne =
() -> regionOneLocalCopy.values().stream().map(Portfolio.class::cast);
Supplier<Stream<Portfolio>> supplierTwo =
() -> regionTwoLocalCopy.values().stream().map(Portfolio.class::cast);
for (int i = 1; i <= 70; i++) {
Portfolio portfolio = new Portfolio(i);
portfolio.shortID = (short) ((short) i / 5);
regionOneLocalCopy.put(i, portfolio);
}
for (int i = 50; i <= 100; i++) {
Portfolio portfolio = new Portfolio(i);
portfolio.shortID = (short) ((short) i / 5);
regionTwoLocalCopy.put(i, portfolio);
}
// Simple Queries
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p",
downCast(supplierOne.get().mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0).mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.ID <= 100",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.getID() <= 100)
.mapToInt(Portfolio::getID).sum()));
queries.put(
"SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.status='active'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.isActive())
.mapToInt(Portfolio::getID).sum()));
queries.put(
"SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.status LIKE 'ina%'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.status.startsWith("ina"))
.mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID IN SET(1, 2, 3, 4, 5)",
downCast(supplierOne.get().filter(p -> Arrays.asList(1, 2, 3, 4, 5).contains(p.getID()))
.mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE NOT (p.ID > 5)",
downCast(supplierOne.get().filter(p -> p.getID() <= 5).mapToInt(Portfolio::getID).sum()));
// Distinct Queries
queries.put("SELECT SUM(DISTINCT p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0)
.filter(distinctByKey(Portfolio::getID)).mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(DISTINCT p.shortID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0).filter(distinctByKey(p -> p.shortID))
.mapToInt(p -> p.shortID).sum()));
// StructSet queries.
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND pos.secId = 'IBM'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 && p.getPositions().containsKey("IBM"))
.mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID < 100 AND pos.secId = 'IBM'",
downCast(
supplierOne.get().filter(p -> p.getID() < 100 && p.getPositions().containsKey("IBM"))
.mapToInt(Portfolio::getID).sum()));
queries.put("SELECT DISTINCT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND pos.secId = 'IBM'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 && p.getPositions().containsKey("IBM"))
.distinct().mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND p.status = 'active' AND pos.secId = 'IBM'",
downCast(supplierOne.get()
.filter(p -> p.getID() > 0 && p.isActive() && p.getPositions().containsKey("IBM"))
.mapToInt(Portfolio::getID).sum()));
// Aggregate used as as WHERE condition within inner query.
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p WHERE p.ID IN (SELECT MIN(o.ID) FROM /" + firstRegionName + " o)",
downCast(supplierOne.get()
.filter(p -> p.getID() == supplierOne.get().mapToInt(Portfolio::getID).min().orElse(-1))
.mapToInt(Portfolio::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p WHERE p.ID = ELEMENT(SELECT MAX(o.ID) FROM /" + firstRegionName + " o)",
downCast(supplierOne.get()
.filter(p -> p.getID() == supplierOne.get().mapToInt(Portfolio::getID).max().orElse(-1))
.mapToInt(Portfolio::getID).sum()));
// Equi Join Queries
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 0",
downCast(supplierOne.get().filter(p -> regionTwoLocalCopy.containsKey(p.getID()))
.filter(p -> p.getID() > 0).mapToInt(Portfolio::getID).sum()));
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 20 AND e.ID > 40",
downCast(supplierOne.get()
.filter(p -> supplierTwo.get().filter(e -> e.getID() > 40)
.collect(Collectors.toMap(Portfolio::getID, Function.identity()))
.containsKey(p.getID()))
.filter(p -> p.getID() > 20).mapToInt(Portfolio::getID).sum()));
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 0 AND p.status = 'active'",
downCast(supplierOne.get().filter(p -> regionTwoLocalCopy.containsKey(p.getID()))
.filter(p -> p.getID() > 0 && p.isActive()).mapToInt(Portfolio::getID).sum()));
}
private void prepareStructuresWithPdx() {
Supplier<Stream<PortfolioPdx>> supplierOne =
() -> regionOneLocalCopy.values().stream().map(PortfolioPdx.class::cast);
Supplier<Stream<PortfolioPdx>> supplierTwo =
() -> regionTwoLocalCopy.values().stream().map(PortfolioPdx.class::cast);
for (int i = 1; i <= 70; i++) {
PortfolioPdx portfolio = new PortfolioPdx(i);
portfolio.shortID = (short) ((short) i / 5);
regionOneLocalCopy.put(i, portfolio);
}
for (int i = 50; i <= 100; i++) {
PortfolioPdx portfolio = new PortfolioPdx(i);
portfolio.shortID = (short) ((short) i / 5);
regionTwoLocalCopy.put(i, portfolio);
}
// Simple Queries
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p",
downCast(supplierOne.get().mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0).mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.ID <= 100",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.getID() <= 100)
.mapToInt(PortfolioPdx::getID).sum()));
queries.put(
"SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.status='active'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.isActive())
.mapToInt(PortfolioPdx::getID).sum()));
queries.put(
"SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0 OR p.status LIKE 'ina%'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 || p.status.startsWith("ina"))
.mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE p.ID IN SET(1, 2, 3, 4, 5)",
downCast(supplierOne.get().filter(p -> Arrays.asList(1, 2, 3, 4, 5).contains(p.getID()))
.mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName + " p WHERE NOT (p.ID > 5)",
downCast(
supplierOne.get().filter(p -> p.getID() <= 5).mapToInt(PortfolioPdx::getID).sum()));
// Distinct Queries
queries.put("SELECT SUM(DISTINCT p.ID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0)
.filter(distinctByKey(PortfolioPdx::getID)).mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(DISTINCT p.shortID) FROM /" + firstRegionName + " p WHERE p.ID > 0",
downCast(supplierOne.get().filter(p -> p.getID() > 0).filter(distinctByKey(p -> p.shortID))
.mapToInt(p -> p.shortID).sum()));
// StructSet queries.
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND pos.secId = 'IBM'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 && p.getPositions().containsKey("IBM"))
.mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID < 100 AND pos.secId = 'IBM'",
downCast(
supplierOne.get().filter(p -> p.getID() < 100 && p.getPositions().containsKey("IBM"))
.mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT DISTINCT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND pos.secId = 'IBM'",
downCast(supplierOne.get().filter(p -> p.getID() > 0 && p.getPositions().containsKey("IBM"))
.distinct().mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p, p.positions.values pos WHERE p.ID > 0 AND p.status = 'active' AND pos.secId = 'IBM'",
downCast(supplierOne.get()
.filter(p -> p.getID() > 0 && p.isActive() && p.getPositions().containsKey("IBM"))
.mapToInt(PortfolioPdx::getID).sum()));
// Aggregate used as as WHERE condition within inner query.
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p WHERE p.ID IN (SELECT MIN(o.ID) FROM /" + firstRegionName + " o)",
downCast(supplierOne.get()
.filter(
p -> p.getID() == supplierOne.get().mapToInt(PortfolioPdx::getID).min().orElse(-1))
.mapToInt(PortfolioPdx::getID).sum()));
queries.put("SELECT SUM(p.ID) FROM /" + firstRegionName
+ " p WHERE p.ID = ELEMENT(SELECT MAX(o.ID) FROM /" + firstRegionName + " o)",
downCast(supplierOne.get()
.filter(
p -> p.getID() == supplierOne.get().mapToInt(PortfolioPdx::getID).max().orElse(-1))
.mapToInt(PortfolioPdx::getID).sum()));
// Equi Join Queries
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 0",
downCast(supplierOne.get().filter(p -> regionTwoLocalCopy.containsKey(p.getID()))
.filter(p -> p.getID() > 0).mapToInt(PortfolioPdx::getID).sum()));
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 20 AND e.ID > 40",
downCast(supplierOne.get()
.filter(p -> supplierTwo.get().filter(e -> e.getID() > 40)
.collect(Collectors.toMap(PortfolioPdx::getID, Function.identity()))
.containsKey(p.getID()))
.filter(p -> p.getID() > 20).mapToInt(PortfolioPdx::getID).sum()));
equiJoinQueries.put("SELECT SUM(p.ID) from /" + firstRegionName + " p, /" + secondRegionName
+ " e WHERE p.ID = e.ID AND p.ID > 0 AND p.status = 'active'",
downCast(supplierOne.get().filter(p -> regionTwoLocalCopy.containsKey(p.getID()))
.filter(p -> p.getID() > 0 && p.isActive()).mapToInt(PortfolioPdx::getID).sum()));
}
public void parametrizedSetUp(boolean usePdx) {
if (!usePdx) {
prepareStructures();
} else {
prepareStructuresWithPdx();
}
}
@Test
@Parameters({
"LOCAL, true", "LOCAL, false", "REPLICATE, true",
"REPLICATE, false", "PARTITION, true", "PARTITION, false"
})
@TestCaseName("[{index}] {method}(RegionType:{0},PDX:{1})")
public void sumShouldWorkCorrectlyOnDifferentRegionTypes(RegionShortcut regionShortcut,
boolean usePdx) throws Exception {
parametrizedSetUp(usePdx);
createAndPopulateRegion(firstRegionName, regionShortcut, regionOneLocalCopy);
QueryService queryService = server.getCache().getQueryService();
for (String queryStr : queries.keySet()) {
Query query = queryService.newQuery(queryStr);
SelectResults result = (SelectResults) query.execute();
assertThat(result.size()).isEqualTo(1);
assertThat(result.asList().get(0)).isInstanceOf(Number.class);
assertThat(result.asList().get(0))
.as(String.format("Query %s didn't return expected results", queryStr))
.isEqualTo(queries.get(queryStr));
}
}
@Test
@Parameters({
"LOCAL, true", "LOCAL, false", "REPLICATE, true",
"REPLICATE, false", "PARTITION, true", "PARTITION, false"
})
@TestCaseName("[{index}] {method}(RegionType:{0},PDX:{1})")
public void sumShouldWorkCorrectlyOnDifferentRegionTypesWithIndexes(RegionShortcut regionShortcut,
boolean usePdx) throws Exception {
parametrizedSetUp(usePdx);
createRegion(firstRegionName, regionShortcut);
QueryService queryService = server.getCache().getQueryService();
queryService.createIndex("sampleIndex-1", "p.ID", "/" + firstRegionName + " p");
queryService.createIndex("sampleIndex-2", "p.status", "/" + firstRegionName + " p");
queryService.createIndex("sampleIndex-3", "pos.secId",
"/" + firstRegionName + " p, p.positions.values pos");
await().untilAsserted(() -> assertThat(queryService.getIndexes().size()).isEqualTo(3));
populateRegion(firstRegionName, regionOneLocalCopy);
for (String queryStr : queries.keySet()) {
Query query = queryService.newQuery(queryStr);
SelectResults result = (SelectResults) query.execute();
assertThat(result.size()).isEqualTo(1);
assertThat(result.asList().get(0)).isInstanceOf(Number.class);
assertThat(result.asList().get(0))
.as(String.format("Query %s didn't return expected results", queryStr))
.isEqualTo(queries.get(queryStr));
}
}
@Test
@TestCaseName("[{index}] {method}(RegionType:{0},PDX:{1})")
@Parameters({"LOCAL, true", "LOCAL, false", "REPLICATE, true", "REPLICATE, false"})
public void sumWithEquiJoinShouldWorkCorrectlyOnDifferentRegionTypes(
RegionShortcut regionShortcut, boolean usePdx) throws Exception {
parametrizedSetUp(usePdx);
createAndPopulateRegion(firstRegionName, regionShortcut, regionOneLocalCopy);
createAndPopulateRegion(secondRegionName, regionShortcut, regionTwoLocalCopy);
QueryService queryService = server.getCache().getQueryService();
for (String queryStr : equiJoinQueries.keySet()) {
Query query = queryService.newQuery(queryStr);
SelectResults result = (SelectResults) query.execute();
assertThat(result.size()).isEqualTo(1);
assertThat(result.asList().get(0)).isInstanceOf(Number.class);
assertThat(result.asList().get(0))
.as(String.format("Query %s didn't return expected results", queryStr))
.isEqualTo(equiJoinQueries.get(queryStr));
}
}
@Test
@TestCaseName("[{index}] {method}(RegionType:{0},PDX:{1})")
@Parameters({"LOCAL, true", "LOCAL, false", "REPLICATE, true", "REPLICATE, false"})
public void sumWithEquiJoinShouldWorkCorrectlyOnDifferentRegionTypesWithIndexes(
RegionShortcut regionShortcut, boolean usePdx) throws Exception {
parametrizedSetUp(usePdx);
createRegion(firstRegionName, regionShortcut);
createRegion(secondRegionName, regionShortcut);
QueryService queryService = server.getCache().getQueryService();
queryService.createIndex("sampleIndex-1", "p.ID", "/" + firstRegionName + " p");
queryService.createIndex("sampleIndex-2", "p.status", "/" + firstRegionName + " p");
queryService.createIndex("sampleIndex-3", "e.ID", "/" + secondRegionName + " e");
queryService.createIndex("sampleIndex-4", "e.status", "/" + secondRegionName + " e");
await().untilAsserted(() -> assertThat(queryService.getIndexes().size()).isEqualTo(4));
populateRegion(firstRegionName, regionOneLocalCopy);
populateRegion(secondRegionName, regionTwoLocalCopy);
for (String queryStr : equiJoinQueries.keySet()) {
Query query = queryService.newQuery(queryStr);
SelectResults result = (SelectResults) query.execute();
assertThat(result.size()).isEqualTo(1);
assertThat(result.asList().get(0)).isInstanceOf(Number.class);
assertThat(result.asList().get(0))
.as(String.format("Query %s didn't return expected results", queryStr))
.isEqualTo(equiJoinQueries.get(queryStr));
}
}
}