blob: f577c5983aa3f925a4159340cadbdd3c438a2689 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityKey;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.h2.twostep.ReduceIndex;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.util.StringUtils;
/**
* Tests for correct distributed partitioned queries.
*/
@SuppressWarnings("unchecked")
public class IgniteSqlSplitterSelfTest extends AbstractIndexingCommonTest {
/** */
private static final int CLIENT = 7;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheKeyConfiguration keyCfg = new CacheKeyConfiguration(TestKey.class.getName(), "affKey");
cfg.setCacheKeyConfiguration(keyCfg);
cfg.setPeerClassLoadingEnabled(false);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGridsMultiThreaded(3, false);
startClientGrid(CLIENT);
}
/**
* @param name Cache name.
* @param partitioned Partition or replicated cache.
* @param idxTypes Indexed types.
* @return Cache configuration.
*/
private static CacheConfiguration cacheConfig(String name, boolean partitioned, Class<?>... idxTypes) {
return new CacheConfiguration(DEFAULT_CACHE_NAME)
.setName(name)
.setCacheMode(partitioned ? CacheMode.PARTITIONED : CacheMode.REPLICATED)
.setAtomicityMode(CacheAtomicityMode.ATOMIC)
.setBackups(1)
.setIndexedTypes(idxTypes);
}
/**
* Tests offset and limit clauses for query.
* @throws Exception If failed.
*/
@Test
public void testOffsetLimit() throws Exception {
IgniteCache<Integer, Integer> c = ignite(0).getOrCreateCache(cacheConfig("ints", true,
Integer.class, Integer.class));
try {
awaitPartitionMapExchange();
List<Integer> res = new ArrayList<>();
Random rnd = new GridRandom();
for (int i = 0; i < 10; i++) {
int val = rnd.nextInt(100);
c.put(i, val);
res.add(val);
}
Collections.sort(res);
String qry = "select _val from Integer order by _val ";
assertEqualsCollections(res, columnQuery(c, qry));
assertEqualsCollections(res.subList(0, 0), columnQuery(c, qry + "limit ?", 0));
assertEqualsCollections(res.subList(0, 3), columnQuery(c, qry + "limit ?", 3));
assertEqualsCollections(res.subList(0, 9), columnQuery(c, qry + "limit ? offset ?", 9, 0));
assertEqualsCollections(res.subList(3, 7), columnQuery(c, qry + "limit ? offset ?", 4, 3));
assertEqualsCollections(res.subList(7, 9), columnQuery(c, qry + "limit ? offset ?", 2, 7));
assertEqualsCollections(res.subList(8, 10), columnQuery(c, qry + "limit ? offset ?", 2, 8));
assertEqualsCollections(res.subList(9, 10), columnQuery(c, qry + "limit ? offset ?", 1, 9));
assertEqualsCollections(res.subList(10, 10), columnQuery(c, qry + "limit ? offset ?", 1, 10));
assertEqualsCollections(res.subList(9, 10), columnQuery(c, qry + "limit ? offset abs(-(4 + ?))", 1, 5));
}
finally {
c.destroy();
}
}
/**
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-10199")
@Test
public void testMergeJoin() {
IgniteCache<Integer, Org> c = ignite(CLIENT).getOrCreateCache(cacheConfig("org", true,
Integer.class, Org.class));
try {
String qry = "select o1.* from Org o1, " +
"(select max(o.name) as name, o.id from Org o group by o.id) o2 " +
"where o1.id = o2.id";
List<List<?>> plan = c.query(new SqlFieldsQuery("explain " + qry)
.setEnforceJoinOrder(true)).getAll();
X.println("Plan: " + plan);
String map0 = (String)plan.get(0).get(0);
String map1 = (String)plan.get(1).get(0);
String rdc = (String)plan.get(2).get(0);
assertTrue(map0.contains("ORDER BY"));
assertTrue(map1.contains("ORDER BY"));
assertEquals(3, rdc.split("merge_sorted").length);
}
finally {
c.destroy();
}
}
/** */
@Test
public void testPushDownSubquery() {
IgniteCache<Integer, Person> c = ignite(CLIENT).getOrCreateCache(cacheConfig("ps", true,
Integer.class, Person.class));
try {
String subqry = "(select max(p.id) as id, p.depId from Person p group by p.depId)";
// Subquery in where clause.
String qry = "select 1 from Person p0 join " + subqry + " p1 on p0.id = p1.id where p0.id = " +
" (select p.id from Person p where p.id = p0.id)";
assertEquals(0, c.query(new SqlFieldsQuery(qry)).getAll().size());
List<List<?>> plan = c.query(new SqlFieldsQuery("explain " + qry)).getAll();
X.println(" Plan: " + plan);
assertEquals(3, plan.size());
String rdc = (String)plan.get(2).get(0);
assertFalse(rdc.contains("PERSON"));
qry = "select (select p.id from Person p where p.id = p0.id) from Person p0 join " +
subqry + " p1 on p0.id = p1.id";
assertEquals(0, c.query(new SqlFieldsQuery(qry)).getAll().size());
plan = c.query(new SqlFieldsQuery("explain " + qry)).getAll();
X.println(" Plan: " + plan);
assertEquals(3, plan.size());
rdc = (String)plan.get(2).get(0);
assertFalse(rdc.contains("PERSON"));
}
finally {
c.destroy();
}
}
/**
*/
@Test
public void testPushDown() {
IgniteCache<Integer, Person> c = ignite(CLIENT).getOrCreateCache(cacheConfig("ps", true,
Integer.class, Person.class));
try {
String subqry = "(select max(p.id) as id, p.depId from Person p group by p.depId)";
for (int i = 0; i < 5; i++) {
SB qry = new SB("select * from ");
for (int j = 0; j < 5; j++) {
if (j != 0)
qry.a(", ");
if (j == i)
qry.a(subqry);
else
qry.a("Person");
qry.a(" p").a(j);
}
qry.a(" where");
for (int j = 1; j < 5; j++) {
if (j != 1)
qry.a(" and");
qry.a(" p").a(j - 1).a(".id").a(" = ").a("p").a(j).a(".depId");
}
c.query(new SqlFieldsQuery(qry.toString())
.setEnforceJoinOrder(true)).getAll();
X.println("\nPlan:\n" +
c.query(new SqlFieldsQuery("explain " + qry.toString())
.setEnforceJoinOrder(true)).getAll());
}
}
finally {
c.destroy();
}
}
/**
*/
@Test
public void testPushDownLeftJoin() {
IgniteCache<Integer, Person> c = ignite(0).getOrCreateCache(cacheConfig("ps", true,
Integer.class, Person.class));
try {
String subqryAgg = "(select max(p.id) as id, p.depId from Person p group by p.depId)";
String subqrySimple = "(select p.id, p.depId from Person p)";
for (int i = 0; i < 5; i++) {
for (int k = 0; k < 5; k++) {
SB qry = new SB("select * from ");
for (int j = 0; j < 5; j++) {
if (j != 0)
qry.a(j == i ? " left join " : " join ");
if (j == 2)
qry.a(subqryAgg);
else
qry.a(j == k ? subqrySimple : "Person");
qry.a(" p").a(j);
if (j != 0) {
qry.a(" on ");
qry.a(" p0.id").a(" = ").a("p").a(j).a(".depId");
}
}
X.println(" ---> ik: : " + i + " " + k);
X.println("\nqry: \n" + qry.toString());
c.query(new SqlFieldsQuery(qry.toString())
.setEnforceJoinOrder(true)).getAll();
X.println("\nPlan:\n" +
c.query(new SqlFieldsQuery("explain " + qry.toString())
.setEnforceJoinOrder(true)).getAll());
}
}
}
finally {
c.destroy();
}
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCache() {
doTestReplicatedTablesUsingPartitionedCache(1, false, false);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheSegmented() {
doTestReplicatedTablesUsingPartitionedCache(5, false, false);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheClient() {
doTestReplicatedTablesUsingPartitionedCache(1, true, false);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheSegmentedClient() {
doTestReplicatedTablesUsingPartitionedCache(5, true, false);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheRO() {
doTestReplicatedTablesUsingPartitionedCache(1, false, true);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheSegmentedRO() {
doTestReplicatedTablesUsingPartitionedCache(5, false, true);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheClientRO() {
doTestReplicatedTablesUsingPartitionedCache(1, true, true);
}
/**
*/
@Test
public void testReplicatedTablesUsingPartitionedCacheSegmentedClientRO() {
doTestReplicatedTablesUsingPartitionedCache(5, true, true);
}
/**
*/
private SqlFieldsQuery query(String sql, boolean replicatedOnly) {
SqlFieldsQuery qry = new SqlFieldsQuery(sql);
if (replicatedOnly)
qry.setReplicatedOnly(true);
return qry;
}
/**
*/
private void doTestReplicatedTablesUsingPartitionedCache(int segments, boolean client, boolean replicatedOnlyFlag) {
IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
Integer.class, Value.class).setQueryParallelism(segments));
IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
Integer.class, Value.class));
try {
int cnt = 1000;
for (int i = 0; i < cnt; i++)
r.put(i, new Value(i, -i));
// Query data from replicated table using partitioned cache.
assertEquals(cnt, p.query(query("select 1 from \"r\".Value", replicatedOnlyFlag))
.getAll().size());
List<List<?>> res = p.query(query("select count(1) from \"r\".Value", replicatedOnlyFlag)).getAll();
assertEquals(1, res.size());
assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
}
finally {
p.destroy();
r.destroy();
}
}
@Test
public void testPartitionedTablesUsingReplicatedCache() {
doTestPartitionedTablesUsingReplicatedCache(1, false);
}
@Test
public void testPartitionedTablesUsingReplicatedCacheSegmented() {
doTestPartitionedTablesUsingReplicatedCache(7, false);
}
@Test
public void testPartitionedTablesUsingReplicatedCacheClient() {
doTestPartitionedTablesUsingReplicatedCache(1, true);
}
@Test
public void testPartitionedTablesUsingReplicatedCacheSegmentedClient() {
doTestPartitionedTablesUsingReplicatedCache(7, true);
}
/**
*/
private void doTestPartitionedTablesUsingReplicatedCache(int segments, boolean client) {
IgniteCache<Integer,Value> p = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("p", true,
Integer.class, Value.class).setQueryParallelism(segments));
IgniteCache<Integer,Value> r = ignite(client ? CLIENT : 0).getOrCreateCache(cacheConfig("r", false,
Integer.class, Value.class));
try {
int cnt = 1000;
for (int i = 0; i < cnt; i++)
p.put(i, new Value(i, -i));
// Query data from replicated table using partitioned cache.
assertEquals(cnt, r.query(new SqlFieldsQuery("select 1 from \"p\".Value")).getAll().size());
List<List<?>> res = r.query(new SqlFieldsQuery("select count(1) from \"p\".Value")).getAll();
assertEquals(1, res.size());
assertEquals(cnt, ((Number)res.get(0).get(0)).intValue());
}
finally {
p.destroy();
r.destroy();
}
}
/**
*/
@Test
public void testSubQueryWithAggregate() {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
AffinityKey.class, Person2.class);
IgniteCache<AffinityKey<Integer>, Person2> c1 = ignite(0).getOrCreateCache(ccfg1);
try {
int orgId = 100500;
c1.put(new AffinityKey<>(1, orgId), new Person2(orgId, "Vasya"));
c1.put(new AffinityKey<>(2, orgId), new Person2(orgId, "Another Vasya"));
List<List<?>> rs = c1.query(new SqlFieldsQuery("select name, " +
"select count(1) from Person2 q where q.orgId = p.orgId " +
"from Person2 p order by name desc")).getAll();
assertEquals(2, rs.size());
assertEquals("Vasya", rs.get(0).get(0));
assertEquals(2L, rs.get(0).get(1));
assertEquals("Another Vasya", rs.get(1).get(0));
assertEquals(2L, rs.get(1).get(1));
}
finally {
c1.destroy();
}
}
/**
* @throws InterruptedException If failed.
*/
@Test
public void testDistributedJoinFromReplicatedCache() throws InterruptedException {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class);
CacheConfiguration ccfg2 = cacheConfig("org", true,
Integer.class, Organization.class);
CacheConfiguration ccfg3 = cacheConfig("orgRepl", false,
Integer.class, Organization.class);
IgniteCache<Integer, Person2> c1 = ignite(0).getOrCreateCache(ccfg1);
IgniteCache<Integer, Organization> c2 = ignite(0).getOrCreateCache(ccfg2);
IgniteCache<Integer, Organization> c3 = ignite(0).getOrCreateCache(ccfg3);
try {
awaitPartitionMapExchange();
doTestDistributedJoins(c3, c1, c2, 300, 2000, 5, false);
doTestDistributedJoins(c3, c1, c2, 300, 2000, 5, true);
}
finally {
c1.destroy();
c2.destroy();
}
}
@SuppressWarnings("SuspiciousMethodCalls")
@Test
public void testExists() {
IgniteCache<Integer,Person2> x = ignite(0).getOrCreateCache(cacheConfig("x", true,
Integer.class, Person2.class));
IgniteCache<Integer,Person2> y = ignite(0).getOrCreateCache(cacheConfig("y", true,
Integer.class, Person2.class));
try {
GridRandom rnd = new GridRandom();
Set<Integer> intersects = new HashSet<>();
for (int i = 0; i < 3000; i++) {
int r = rnd.nextInt(3);
if (r != 0)
x.put(i, new Person2(i, "pers_x_" + i));
if (r != 1)
y.put(i, new Person2(i, "pers_y_" + i));
if (r == 2)
intersects.add(i);
}
assertFalse(intersects.isEmpty());
List<List<?>> res = x.query(new SqlFieldsQuery("select _key from \"x\".Person2 px " +
"where exists(select 1 from \"y\".Person2 py where px._key = py._key)")).getAll();
assertEquals(intersects.size(), res.size());
for (List<?> row : res)
assertTrue(intersects.contains(row.get(0)));
}
finally {
x.destroy();
y.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSortedMergeIndex() throws Exception {
IgniteCache<Integer,Value> c = ignite(0).getOrCreateCache(cacheConfig("v", true,
Integer.class, Value.class));
try {
GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 8);
Random rnd = new GridRandom();
int cnt = 1000;
for (int i = 0; i < cnt; i++) {
c.put(i, new Value(
rnd.nextInt(5) == 0 ? null: rnd.nextInt(100),
rnd.nextInt(8) == 0 ? null: rnd.nextInt(2000)));
}
List<List<?>> plan = c.query(new SqlFieldsQuery(
"explain select snd from Value order by fst desc")).getAll();
String rdcPlan = (String)plan.get(1).get(0);
assertTrue(rdcPlan.contains("merge_sorted"));
assertTrue(rdcPlan.contains("/* index sorted */"));
plan = c.query(new SqlFieldsQuery(
"explain select snd from Value")).getAll();
rdcPlan = (String)plan.get(1).get(0);
assertTrue(rdcPlan.contains("merge_scan"));
assertFalse(rdcPlan.contains("/* index sorted */"));
for (int i = 0; i < 10; i++) {
X.println(" --> " + i);
List<List<?>> res = c.query(new SqlFieldsQuery(
"select fst from Value order by fst").setPageSize(5)
).getAll();
assertEquals(cnt, res.size());
Integer p = null;
for (List<?> row : res) {
Integer x = (Integer)row.get(0);
if (x != null) {
if (p != null)
assertTrue(x + " >= " + p, x >= p);
p = x;
}
}
}
}
finally {
GridTestUtils.setFieldValue(null, ReduceIndex.class, "PREFETCH_SIZE", 1024);
c.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testGroupIndexOperations() throws Exception {
IgniteCache<Integer, GroupIndexTestValue> c = ignite(0).getOrCreateCache(cacheConfig("grp", false,
Integer.class, GroupIndexTestValue.class));
try {
awaitPartitionMapExchange();
// Check group index usage.
String qry = "select 1 from GroupIndexTestValue ";
String plan = columnQuery(c, "explain " + qry + "where a = 1 and b > 0")
.get(0).toString();
info("Plan: " + plan);
assertTrue("_explain: " + plan, plan.toLowerCase().contains("grpidx"));
// Sorted list
List<GroupIndexTestValue> list = F.asList(
new GroupIndexTestValue(0, 0),
new GroupIndexTestValue(0, 5),
new GroupIndexTestValue(1, 1),
new GroupIndexTestValue(1, 3),
new GroupIndexTestValue(2, -1),
new GroupIndexTestValue(2, 2)
);
// Fill cache.
for (int i = 0; i < list.size(); i++)
c.put(i, list.get(i));
// Check results.
assertEquals(1, columnQuery(c, qry + "where a = 1 and b = 1").size());
assertEquals(0, columnQuery(c, qry + "where a = 1 and b = 2").size());
assertEquals(1, columnQuery(c, qry + "where a = 1 and b = 3").size());
assertEquals(2, columnQuery(c, qry + "where a = 1 and b < 4").size());
assertEquals(2, columnQuery(c, qry + "where a = 1 and b <= 3").size());
assertEquals(1, columnQuery(c, qry + "where a = 1 and b < 3").size());
assertEquals(2, columnQuery(c, qry + "where a = 1 and b > 0").size());
assertEquals(1, columnQuery(c, qry + "where a = 1 and b > 1").size());
assertEquals(2, columnQuery(c, qry + "where a = 1 and b >= 1").size());
assertEquals(4, columnQuery(c, qry + "where a > 0").size());
assertEquals(4, columnQuery(c, qry + "where a >= 1").size());
assertEquals(4, columnQuery(c, qry + "where b > 0").size());
assertEquals(4, columnQuery(c, qry + "where b >= 1").size());
assertEquals(4, columnQuery(c, qry + "where a < 2").size());
assertEquals(4, columnQuery(c, qry + "where a <= 1").size());
assertEquals(4, columnQuery(c, qry + "where b < 3").size());
assertEquals(5, columnQuery(c, qry + "where b <= 3").size());
assertEquals(3, columnQuery(c, qry + "where a > 0 and b > 0").size());
assertEquals(2, columnQuery(c, qry + "where a > 0 and b >= 2").size());
assertEquals(3, columnQuery(c, qry + "where a >= 1 and b > 0").size());
assertEquals(2, columnQuery(c, qry + "where a >= 1 and b >= 2").size());
assertEquals(3, columnQuery(c, qry + "where a > 0 and b < 3").size());
assertEquals(2, columnQuery(c, qry + "where a > 0 and b <= 1").size());
assertEquals(3, columnQuery(c, qry + "where a >= 1 and b < 3").size());
assertEquals(2, columnQuery(c, qry + "where a >= 1 and b <= 1").size());
assertEquals(2, columnQuery(c, qry + "where a < 2 and b < 3").size());
assertEquals(2, columnQuery(c, qry + "where a < 2 and b <= 1").size());
assertEquals(2, columnQuery(c, qry + "where a <= 1 and b < 3").size());
assertEquals(2, columnQuery(c, qry + "where a <= 1 and b <= 1").size());
assertEquals(3, columnQuery(c, qry + "where a < 2 and b > 0").size());
assertEquals(2, columnQuery(c, qry + "where a < 2 and b >= 3").size());
assertEquals(3, columnQuery(c, qry + "where a <= 1 and b > 0").size());
assertEquals(2, columnQuery(c, qry + "where a <= 1 and b >= 3").size());
}
finally {
c.destroy();
}
}
/**
*/
@Test
public void testUseIndexHints() {
CacheConfiguration ccfg = cacheConfig("pers", true,
Integer.class, Person2.class);
IgniteCache<Integer, Person2> c = ignite(0).getOrCreateCache(ccfg);
try {
String select = "select 1 from Person2 use index (\"PERSON2_ORGID_IDX\") where name = '' and orgId = 1";
String plan = c.query(new SqlFieldsQuery("explain " + select)).getAll().toString();
X.println("Plan: \n" + plan);
assertTrue(plan.contains("USE INDEX (PERSON2_ORGID_IDX)"));
assertTrue(plan.contains("/* \"pers\".PERSON2_ORGID_IDX:"));
select = "select 1 from Person2 use index (\"PERSON2_NAME_IDX\") where name = '' and orgId = 1";
plan = c.query(new SqlFieldsQuery("explain " + select)).getAll().toString();
X.println("Plan: \n" + plan);
assertTrue(plan.contains("USE INDEX (PERSON2_NAME_IDX)"));
assertTrue(plan.contains("/* \"pers\".PERSON2_NAME_IDX:"));
}
finally {
c.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDistributedJoins() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class);
CacheConfiguration ccfg2 = cacheConfig("org", true,
Integer.class, Organization.class);
IgniteCache<Integer, Person2> c1 = ignite(0).getOrCreateCache(ccfg1);
IgniteCache<Integer, Organization> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
awaitPartitionMapExchange();
doTestDistributedJoins(c2, c1, c2, 30, 100, 1000, false);
doTestDistributedJoins(c2, c1, c2, 30, 100, 1000, true);
doTestDistributedJoins(c2, c1, c2, 3, 10, 3, false);
doTestDistributedJoins(c2, c1, c2, 3, 10, 3, true);
doTestDistributedJoins(c2, c1, c2, 300, 2000, 5, false);
doTestDistributedJoins(c2, c1, c2, 300, 2000, 5, true);
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDistributedJoinsUnion() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true, Integer.class, Person2.class);
CacheConfiguration ccfg2 = cacheConfig("org", true, Integer.class, Organization.class);
IgniteCache<Integer, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
IgniteCache<Integer, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
c2.put(1, new Organization("o1"));
c2.put(2, new Organization("o2"));
c1.put(3, new Person2(1, "p1"));
c1.put(4, new Person2(2, "p2"));
c1.put(5, new Person2(3, "p3"));
String select = "select o.name n1, p.name n2 from Person2 p, \"org\".Organization o" +
" where p.orgId = o._key and o._key=1" +
" union select o.name n1, p.name n2 from Person2 p, \"org\".Organization o" +
" where p.orgId = o._key and o._key=2";
String plan = c1.query(new SqlFieldsQuery("explain " + select)
.setDistributedJoins(true).setEnforceJoinOrder(true))
.getAll().toString();
X.println("Plan : " + plan);
assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched:unicast"));
assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)
.setEnforceJoinOrder(false)).getAll().size());
select = "select * from (" + select + ")";
plan = c1.query(new SqlFieldsQuery("explain " + select)
.setDistributedJoins(true).setEnforceJoinOrder(true))
.getAll().toString();
X.println("Plan : " + plan);
assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, StringUtils.countOccurrencesOf(plan, "batched:unicast"));
assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)
.setEnforceJoinOrder(false)).getAll().size());
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDistributedJoinsUnionPartitionedReplicated() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class);
CacheConfiguration ccfg2 = cacheConfig("org", false,
Integer.class, Organization.class);
IgniteCache<Integer, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
IgniteCache<Integer, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
c2.put(1, new Organization("o1"));
c2.put(2, new Organization("o2"));
c1.put(3, new Person2(1, "p1"));
c1.put(4, new Person2(2, "p2"));
c1.put(5, new Person2(3, "p3"));
String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" +
" union select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2";
String plan = (String)c1.query(new SqlFieldsQuery("explain " + select0)
.setDistributedJoins(true))
.getAll().get(0).get(0);
X.println("Plan: " + plan);
assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll().size());
String select = "select * from (" + select0 + ")";
plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
.setDistributedJoins(true))
.getAll().get(0).get(0);
X.println("Plan : " + plan);
assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
String select1 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" +
" union select * from (select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2)";
plan = (String)c1.query(new SqlFieldsQuery("explain " + select1)
.setDistributedJoins(true)).getAll().get(0).get(0);
X.println("Plan: " + plan);
assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
select = "select * from (" + select1 + ")";
plan = (String)c1.query(new SqlFieldsQuery("explain " + select)
.setDistributedJoins(true)).getAll().get(0).get(0);
X.println("Plan : " + plan);
assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched"));
assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size());
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDistributedJoinsPlan() throws Exception {
List<IgniteCache<Object, Object>> caches = new ArrayList<>();
IgniteCache<Object, Object> persPart =
ignite(0).createCache(cacheConfig("persPart", true, Integer.class, Person2.class));
caches.add(persPart);
IgniteCache<Object, Object> persPartAff =
ignite(0).createCache(cacheConfig("persPartAff", true, TestKey.class, Person2.class));
caches.add(persPartAff);
IgniteCache<Object, Object> orgPart =
ignite(0).createCache(cacheConfig("orgPart", true, Integer.class, Organization.class));
caches.add(orgPart);
IgniteCache<Object, Object> orgPartAff =
ignite(0).createCache(cacheConfig("orgPartAff", true, TestKey.class, Organization.class));
caches.add(orgPartAff);
IgniteCache<Object, Object> orgRepl =
ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, Organization.class));
caches.add(orgRepl);
IgniteCache<Object, Object> orgRepl2 =
ignite(0).createCache(cacheConfig("orgRepl2", false, Integer.class, Organization.class));
caches.add(orgRepl2);
try {
// Join two partitioned.
checkQueryPlan(persPart,
true,
1,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, \"orgPart\".Organization o " +
"where p.orgId = o._key",
"batched:unicast");
checkQueryPlan(persPart,
false,
1,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, \"orgPartAff\".Organization o " +
"where p.orgId = o.affKey",
"batched:unicast");
checkQueryPlan(persPart,
false,
1,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, \"orgPart\".Organization o " +
"where p.orgId = o._key",
"batched:unicast");
checkQueryPlan(persPart,
false,
1,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p inner join \"orgPart\".Organization o " +
"on p.orgId = o._key",
"batched:unicast");
checkQueryPlan(persPart,
false,
1,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p left outer join \"orgPart\".Organization o " +
"on p.orgId = o._key",
"batched:unicast");
checkQueryPlan(persPart,
true,
1,
"select p._key k1, o._key k2 " +
"from \"orgPart\".Organization o, \"persPart\".Person2 p " +
"where p.orgId = o._key",
"batched:broadcast");
checkQueryPlan(persPart,
true,
1,
"select p._key k1, o._key k2 " +
"from \"orgPartAff\".Organization o, \"persPart\".Person2 p " +
"where p.orgId = o.affKey",
"batched:broadcast");
// Join partitioned and replicated.
checkQueryPlan(persPart,
true,
0,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, \"orgRepl\".Organization o " +
"where p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, \"orgRepl\".Organization o " +
"where p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p, (select _key, _val, * from \"orgRepl\".Organization) o " +
"where p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from (select _key, _val, * from \"orgRepl\".Organization) o, \"persPart\".Person2 p " +
"where p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p inner join \"orgRepl\".Organization o " +
"on p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"persPart\".Person2 p left outer join \"orgRepl\".Organization o " +
"on p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"orgRepl\".Organization o, \"persPart\".Person2 p " +
"where p.orgId = o._key");
checkQueryPlan(persPart,
false,
0,
"select p._key k1, o._key k2 " +
"from \"orgRepl\".Organization o inner join \"persPart\".Person2 p " +
"on p.orgId = o._key");
// checkQueryPlan(persPart,
// true,
// 1,
// "select p._key k1, o._key k2 " +
// "from \"orgRepl\".Organization o left outer join \"persPart\".Person2 p " +
// "on p.orgId = o._key",
// "batched:broadcast");
// Join on affinity keys.
checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
"\"persPart\".Person2 p",
"\"orgPart\".Organization o",
"where p._key = o._key", true);
checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
"\"persPart\".Person2 p",
"\"orgRepl\".Organization o",
"where p._key = o._key", true);
checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
"\"persPartAff\".Person2 p",
"\"orgPart\".Organization o",
"where p.affKey = o._key", true);
checkNoBatchedJoin(persPartAff, "select p._key k1, o._key k2 ",
"\"persPartAff\".Person2 p",
"\"orgRepl\".Organization o",
"where p.affKey = o._key", true);
// TODO Now we can not analyze subqueries to decide if we are collocated or not.
// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
// "(select * from \"persPart\".Person2) p",
// "\"orgPart\".Organization o",
// "where p._key = o._key", false);
// checkNoBatchedJoin(persPart, "select p._key k1, o._key k2 ",
// "\"persPart\".Person2 p",
// "(select * from \"orgPart\".Organization) o",
// "where p._key = o._key", false);
// Join multiple.
{
String sql = "select * from " +
"(select o1._key k1, o2._key k2 from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 where o1._key > o2._key) o, " +
"\"persPart\".Person2 p where p.orgId = o.k1";
checkQueryPlan(persPart,
false,
0,
sql);
checkQueryPlan(persPart,
true,
0,
sql);
sql = "select o.k1, p1._key k2, p2._key k3 from " +
"(select o1._key k1, o2._key k2 " +
"from \"orgRepl\".Organization o1, \"orgRepl2\".Organization o2 " +
"where o1._key > o2._key) o, " +
"\"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o.k1";
checkQueryPlan(persPart,
false,
0,
sql,
"persPartAff", "persPart", "orgRepl");
checkQueryFails(persPart, sql, true);
sql = "select o.ok, p._key from " +
"(select o1._key ok, p1._key pk " +
"from \"orgRepl\".Organization o1, \"persPart\".Person2 p1 " +
"where o1._key = p1.orgId) o, " +
"\"persPartAff\".Person2 p where p._key=o.ok";
checkQueryPlan(persPart,
false,
1,
sql,
"FROM \"persPart\"", "INNER JOIN \"orgRepl\"",
"INNER JOIN \"persPartAff\"", "batched:unicast");
checkQueryFails(persPart, sql, true);
}
{
String sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " +
"where p1.affKey=p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
true,
2,
sql,
"batched:unicast", "batched:unicast");
checkQueryPlan(persPart,
false,
2,
sql,
"batched:unicast", "batched:unicast");
}
{
String sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " +
"where p1.affKey > p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
true,
2,
sql,
"batched:broadcast", "batched:unicast");
checkQueryPlan(persPart,
false,
2,
sql,
"batched:broadcast", "batched:unicast");
}
{
// First join is collocated, second is replicated.
String sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " +
"where p1.affKey=p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
true,
0,
sql);
checkQueryPlan(persPart,
false,
0,
sql);
}
{
String sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " +
"where p1._key=p2.name and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
1,
sql,
"batched:unicast");
sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " +
"where p1._key=p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
0,
sql);
sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2.name and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
1,
sql,
"batched:unicast");
sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
0,
sql);
sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from (select _key, _val, * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2.name and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
1,
sql,
"batched:unicast");
sql = "select p1._key k1, p2._key k2, o._key k3 " +
"from (select _key, _val, * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o._key";
checkQueryPlan(persPart,
false,
0,
sql);
}
}
finally {
for (IgniteCache<Object, Object> cache : caches)
ignite(0).destroyCache(cache.getName());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDistributedJoinsEnforceReplicatedNotLast() throws Exception {
List<IgniteCache<Object, Object>> caches = new ArrayList<>();
IgniteCache<Object, Object> persPart =
ignite(0).createCache(cacheConfig("persPart", true, Integer.class, Person2.class));
caches.add(persPart);
IgniteCache<Object, Object> persPartAff =
ignite(0).createCache(cacheConfig("persPartAff", true, TestKey.class, Person2.class));
caches.add(persPartAff);
IgniteCache<Object, Object> orgRepl =
ignite(0).createCache(cacheConfig("orgRepl", false, Integer.class, Organization.class));
caches.add(orgRepl);
try {
checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " +
"from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o._key", true);
checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o._key", true);
checkQueryFails(persPart, "select p1._key k1, p2._key k2, o._key k3 " +
"from \"persPartAff\".Person2 p1, (select * from \"orgRepl\".Organization) o, \"persPart\".Person2 p2 " +
"where p1._key=p2._key and p2.orgId = o._key", true);
checkQueryPlan(persPart,
true,
0,
"select p._key k1, o._key k2 from \"orgRepl\".Organization o, \"persPart\".Person2 p");
checkQueryPlan(persPart,
true,
0,
"select p._key k1, o._key k2 from \"orgRepl\".Organization o, \"persPart\".Person2 p union " +
"select p._key k1, o._key k2 from \"persPart\".Person2 p, \"orgRepl\".Organization o");
}
finally {
for (IgniteCache<Object, Object> cache : caches)
ignite(0).destroyCache(cache.getName());
}
}
/**
*/
@Test
public void testSchemaQuoted() {
doTestSchemaName("\"ppAf\"");
}
/**
*/
@Test
public void testSchemaQuotedUpper() {
doTestSchemaName("\"PPAF\"");
}
/**
*/
@Test
public void testSchemaUnquoted() {
doTestSchemaName("ppAf");
}
/**
*/
@Test
public void testSchemaUnquotedUpper() {
doTestSchemaName("PPAF");
}
/**
* @param schema Schema name.
*/
public void doTestSchemaName(String schema) {
CacheConfiguration ccfg = cacheConfig("persPartAff", true, Integer.class, Person2.class);
ccfg.setSqlSchema(schema);
IgniteCache<Integer, Person2> ppAf = ignite(0).createCache(ccfg);
try {
ppAf.put(1, new Person2(10, "Petya"));
ppAf.put(2, new Person2(10, "Kolya"));
List<List<?>> res = ppAf.query(new SqlFieldsQuery("select name from " +
schema + ".Person2 order by _key")).getAll();
assertEquals("Petya", res.get(0).get(0));
assertEquals("Kolya", res.get(1).get(0));
}
finally {
ppAf.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIndexSegmentation() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class).setQueryParallelism(4);
CacheConfiguration ccfg2 = cacheConfig("org", true,
Integer.class, Organization.class).setQueryParallelism(4);
IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
c2.put(1, new Organization("o1"));
c2.put(2, new Organization("o2"));
c1.put(3, new Person2(1, "p1"));
c1.put(4, new Person2(2, "p2"));
c1.put(5, new Person2(3, "p3"));
String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0));
checkQueryPlan(c1, true, 1, new SqlFieldsQuery(select0).setLocal(true));
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testReplicationCacheIndexSegmentationFailure() throws Exception {
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
CacheConfiguration ccfg = cacheConfig("org", false,
Integer.class, Organization.class).setQueryParallelism(4);
IgniteCache<Object, Object> c = ignite(0).createCache(ccfg);
return null;
}
}, CacheException.class, " Segmented indices are supported for PARTITIONED mode only.");
}
/**
* @throws Exception If failed.
*/
@Test
public void testIndexSegmentationPartitionedReplicated() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class).setQueryParallelism(4);
CacheConfiguration ccfg2 = cacheConfig("org", false,
Integer.class, Organization.class);
final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
c2.put(1, new Organization("o1"));
c2.put(2, new Organization("o2"));
c1.put(3, new Person2(1, "p1"));
c1.put(4, new Person2(2, "p2"));
c1.put(5, new Person2(3, "p3"));
String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key";
SqlFieldsQuery qry = new SqlFieldsQuery(select0);
qry.setDistributedJoins(true);
List<List<?>> results = c1.query(qry).getAll();
assertEquals(2, results.size());
select0 += " order by n2 desc";
qry = new SqlFieldsQuery(select0);
qry.setDistributedJoins(true);
results = c1.query(qry).getAll();
assertEquals(2, results.size());
assertEquals("p2", results.get(0).get(1));
assertEquals("p1", results.get(1).get(1));
// Test for replicated subquery with aggregate.
select0 = "select p.name " +
"from \"pers\".Person2 p, " +
"(select max(_key) orgId from \"org\".Organization) o " +
"where p.orgId = o.orgId";
X.println("Plan: \n" +
c1.query(new SqlFieldsQuery("explain " + select0).setDistributedJoins(true)).getAll());
qry = new SqlFieldsQuery(select0);
qry.setDistributedJoins(true);
results = c1.query(qry).getAll();
assertEquals(1, results.size());
assertEquals("p2", results.get(0).get(0));
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIndexWithDifferentSegmentationLevelsFailure() throws Exception {
CacheConfiguration ccfg1 = cacheConfig("pers", true,
Integer.class, Person2.class).setQueryParallelism(4);
CacheConfiguration ccfg2 = cacheConfig("org", true,
Integer.class, Organization.class).setQueryParallelism(3);
final IgniteCache<Object, Object> c1 = ignite(0).getOrCreateCache(ccfg1);
final IgniteCache<Object, Object> c2 = ignite(0).getOrCreateCache(ccfg2);
try {
c2.put(1, new Organization("o1"));
c2.put(2, new Organization("o2"));
c1.put(3, new Person2(1, "p1"));
c1.put(4, new Person2(2, "p2"));
c1.put(5, new Person2(3, "p3"));
String select0 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1";
final SqlFieldsQuery qry = new SqlFieldsQuery(select0);
qry.setDistributedJoins(true);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
c1.query(qry);
return null;
}
}, CacheException.class, "Using indexes with different parallelism levels in same query is forbidden.");
}
finally {
c1.destroy();
c2.destroy();
}
}
/**
* @param cache Cache.
* @param sql SQL.
* @param enforceJoinOrder Enforce join order flag.
*/
private void checkQueryFails(final IgniteCache<Object, Object> cache,
String sql,
boolean enforceJoinOrder) {
final SqlFieldsQuery qry = new SqlFieldsQuery(sql);
qry.setDistributedJoins(true);
qry.setEnforceJoinOrder(enforceJoinOrder);
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
cache.query(qry);
return null;
}
}, CacheException.class, null);
}
/**
* @param cache Query cache.
* @param select Select clause.
* @param cache1 Cache name1.
* @param cache2 Cache name2.
* @param where Where clause.
* @param testEnforceJoinOrder If {@code true} tests query with enforced join order.
*/
private void checkNoBatchedJoin(IgniteCache<Object, Object> cache,
String select,
String cache1,
String cache2,
String where,
boolean testEnforceJoinOrder) {
checkQueryPlan(cache,
false,
0,
select +
"from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
false,
0,
select +
"from " + cache2 + "," + cache1 + " " + where);
if (testEnforceJoinOrder) {
checkQueryPlan(cache,
true,
0,
select +
"from " + cache1 + "," + cache2 + " " + where);
checkQueryPlan(cache,
true,
0,
select +
"from " + cache2 + "," + cache1 + " " + where);
}
}
/**
* @param cache Cache.
* @param enforceJoinOrder Enforce join order flag.
* @param expBatchedJoins Expected batched joins count.
* @param sql Query.
* @param expText Expected text to find in plan.
*/
private void checkQueryPlan(IgniteCache<Object, Object> cache,
boolean enforceJoinOrder,
int expBatchedJoins,
String sql,
String...expText
) {
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
new SqlFieldsQuery(sql),
expText);
sql = "select * from (" + sql + ")";
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
new SqlFieldsQuery(sql),
expText);
sql = "select * from (" + sql + ")";
checkQueryPlan(cache,
enforceJoinOrder,
expBatchedJoins,
new SqlFieldsQuery(sql),
expText);
}
/**
* @param cache Cache.
* @param enforceJoinOrder Enforce join order flag.
* @param expBatchedJoins Expected batched joins count.
* @param qry Query.
* @param expText Expected text to find in plan.
*/
private void checkQueryPlan(IgniteCache<Object, Object> cache,
boolean enforceJoinOrder,
int expBatchedJoins,
SqlFieldsQuery qry,
String... expText) {
qry.setEnforceJoinOrder(enforceJoinOrder);
qry.setDistributedJoins(true);
String plan = queryPlan(cache, qry);
log.info("\n Plan:\n" + plan);
assertEquals("Unexpected number of batched joins in plan [plan=" + plan + ", qry=" + qry + ']',
expBatchedJoins,
StringUtils.countOccurrencesOf(plan, "batched"));
int startIdx = 0;
for (String exp : expText) {
int idx = plan.indexOf(exp, startIdx);
if (idx == -1) {
fail("Plan does not contain expected string [startIdx=" + startIdx +
", plan=" + plan +
", exp=" + exp + ']');
}
startIdx = idx + 1;
}
}
/**
* Test HAVING clause.
*/
@Test
public void testHaving() {
IgniteCache<Integer, Integer> c = ignite(0).getOrCreateCache(cacheConfig("having", true,
Integer.class, Integer.class));
try {
Random rnd = new GridRandom();
Map<Integer, AtomicLong> cntMap = new HashMap<>();
for (int i = 0; i < 1000; i++) {
int v = (int)(50 * rnd.nextGaussian());
c.put(i, v);
AtomicLong cnt = cntMap.get(v);
if (cnt == null)
cntMap.put(v, cnt = new AtomicLong());
cnt.incrementAndGet();
}
assertTrue(cntMap.size() > 10);
String sqlQry = "select _val, count(*) cnt from Integer group by _val having cnt > ?";
X.println("Plan: " + c.query(new SqlFieldsQuery("explain " + sqlQry).setArgs(0)).getAll());
for (int i = -1; i <= 1001; i += 10) {
List<List<?>> res = c.query(new SqlFieldsQuery(sqlQry).setArgs(i)).getAll();
for (List<?> row : res) {
int v = (Integer)row.get(0);
long cnt = (Long)row.get(1);
assertTrue(cnt + " > " + i, cnt > i);
assertEquals(cntMap.get(v).longValue(), cnt);
}
}
}
finally {
c.destroy();
}
}
/**
* @param c1 Persons cache.
* @param c2 Organizations cache.
* @param orgs Number of organizations.
* @param persons Number of persons.
* @param pageSize Page size.
* @param enforceJoinOrder Enforce join order.
*/
private void doTestDistributedJoins(
IgniteCache<?,?> qryCache,
IgniteCache<Integer, Person2> c1,
IgniteCache<Integer, Organization> c2,
int orgs,
int persons,
int pageSize,
boolean enforceJoinOrder
) {
assertEquals(0, c1.size(CachePeekMode.ALL));
assertEquals(0, c2.size(CachePeekMode.ALL));
int key = 0;
for (int i = 0; i < orgs; i++) {
Organization o = new Organization();
o.name = "Org" + i;
c2.put(key++, o);
}
Random rnd = new GridRandom();
for (int i = 0; i < persons; i++) {
Person2 p = new Person2();
p.name = "Person" + i;
p.orgId = rnd.nextInt(orgs);
c1.put(key++, p);
}
String select = "select count(*) from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key";
String plan = (String)qryCache.query(new SqlFieldsQuery("explain " + select)
.setDistributedJoins(true).setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize))
.getAll().get(0).get(0);
X.println("Plan : " + plan);
if (enforceJoinOrder)
assertTrue(plan, plan.contains("batched:broadcast"));
else
assertTrue(plan, plan.contains("batched:unicast"));
assertEquals((long)persons, qryCache.query(new SqlFieldsQuery(select).setDistributedJoins(true)
.setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0));
c1.clear();
c2.clear();
assertEquals(0, c1.size(CachePeekMode.ALL));
assertEquals(0L, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)
.setEnforceJoinOrder(enforceJoinOrder).setPageSize(pageSize)).getAll().get(0).get(0));
}
/**
* @param c Cache.
* @param qry Query.
* @param args Arguments.
* @return Column as list.
*/
private static <X> List<X> columnQuery(IgniteCache<?, ?> c, String qry, Object... args) {
return column(0, c.query(new SqlFieldsQuery(qry).setArgs(args)).getAll());
}
/**
* @param idx Column index.
* @param rows Rows.
* @return Column as list.
*/
private static <X> List<X> column(int idx, List<List<?>> rows) {
List<X> res = new ArrayList<>(rows.size());
for (List<?> row : rows)
res.add((X)row.get(idx));
return res;
}
/**
*
*/
@Ignore("https://issues.apache.org/jira/browse/IGNITE-1886")
@Test
public void testFunctionNpe() {
IgniteCache<Integer, User> userCache = ignite(0).createCache(
cacheConfig("UserCache", true, Integer.class, User.class));
IgniteCache<Integer, UserOrder> userOrderCache = ignite(0).createCache(
cacheConfig("UserOrderCache", true, Integer.class, UserOrder.class));
IgniteCache<Integer, OrderGood> orderGoodCache = ignite(0).createCache(
cacheConfig("OrderGoodCache", true, Integer.class, OrderGood.class));
try {
String sql =
"SELECT a.* FROM (" +
"SELECT CASE WHEN u.id < 100 THEN u.id ELSE ug.id END id " +
"FROM \"UserCache\".User u, UserOrder ug " +
"WHERE u.id = ug.userId" +
") a, (" +
"SELECT CASE WHEN og.goodId < 5 THEN 100 ELSE og.goodId END id " +
"FROM UserOrder ug, \"OrderGoodCache\".OrderGood og " +
"WHERE ug.id = og.orderId) b " +
"WHERE a.id = b.id";
userOrderCache.query(new SqlFieldsQuery(sql)).getAll();
}
finally {
userCache.destroy();
userOrderCache.destroy();
orderGoodCache.destroy();
}
}
/**
*
*/
@Test
public void testImplicitJoinConditionGeneration() {
IgniteCache<Integer, Person> p = ignite(0).createCache(cacheConfig("P", true, Integer.class, Person.class));
IgniteCache<Integer, Department> d = ignite(0).createCache(cacheConfig("D", true, Integer.class, Department.class));
IgniteCache<Integer, Org> o = ignite(0).createCache(cacheConfig("O", true, Integer.class, Org.class));
try {
info("Plan: " + p.query(new SqlFieldsQuery(
"explain select P.Person.*,dep.*,org.* " +
"from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
"left join O.Org org ON org.id=dep.orgId"
)).getAll());
assertEquals(0, p.query(new SqlFieldsQuery(
"select P.Person.*,dep.*,org.* " +
"from P.Person inner join D.Department dep ON dep.id=P.Person.depId " +
"left join O.Org org ON org.id=dep.orgId"
)).getAll().size());
}
finally {
p.destroy();
d.destroy();
o.destroy();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testJoinWithSubquery() throws Exception {
IgniteCache<Integer, Contract> c1 = ignite(0).createCache(
cacheConfig("Contract", true,
Integer.class, Contract.class));
IgniteCache<Integer, PromoContract> c2 = ignite(0).createCache(
cacheConfig("PromoContract", true,
Integer.class, PromoContract.class));
for (int i = 0; i < 100; i++) {
int coId = i % 10;
int cust = i / 10;
c1.put( i, new Contract(coId, cust));
}
for (int i = 0; i < 10; i++)
c2.put(i, new PromoContract((i % 5) + 1, i));
final List<List<?>> res = c2.query(new SqlFieldsQuery("SELECT CO.CO_ID \n" +
"FROM PromoContract PMC \n" +
"INNER JOIN \"Contract\".Contract CO ON PMC.CO_ID = 5 \n" +
"AND PMC.CO_ID = CO.CO_ID \n" +
"INNER JOIN (SELECT CO_ID FROM PromoContract EBP WHERE EBP.CO_ID = 5 LIMIT 1) VPMC \n" +
"ON PMC.CO_ID = VPMC.CO_ID ")).getAll();
assertFalse(res.isEmpty());
}
/** @throws Exception if failed. */
@Test
public void testDistributedAggregates() throws Exception {
final String cacheName = "ints";
IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName, true,
Integer.class, Value.class));
AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
try {
awaitPartitionMapExchange();
cache.put(node0KeyGen.next(), new Value(1, 3));
cache.put(node1KeyGen.next(), new Value(1, 3));
cache.put(node2KeyGen.next(), new Value(1, 3));
cache.put(node0KeyGen.next(), new Value(2, 1));
cache.put(node1KeyGen.next(), new Value(2, 2));
cache.put(node2KeyGen.next(), new Value(2, 3));
cache.put(node0KeyGen.next(), new Value(3, 1));
cache.put(node0KeyGen.next(), new Value(3, 1));
cache.put(node0KeyGen.next(), new Value(3, 2));
cache.put(node1KeyGen.next(), new Value(3, 1));
cache.put(node1KeyGen.next(), new Value(3, 2));
cache.put(node2KeyGen.next(), new Value(3, 2));
cache.put(node0KeyGen.next(), new Value(4, 2));
cache.put(node1KeyGen.next(), new Value(5, 2));
cache.put(node2KeyGen.next(), new Value(6, 2));
checkSimpleQueryWithAggr(cache);
checkSimpleQueryWithDistinctAggr(cache);
checkQueryWithGroupsAndAggrs(cache);
checkQueryWithGroupsAndDistinctAggr(cache);
checkSimpleQueryWithAggrMixed(cache);
checkQueryWithGroupsAndAggrMixed(cache);
}
finally {
cache.destroy();
}
}
/** @throws Exception if failed. */
@Test
public void testCollocatedAggregates() throws Exception {
final String cacheName = "ints";
IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName, true,
Integer.class, Value.class));
AffinityKeyGenerator node0KeyGen = new AffinityKeyGenerator(ignite(0), cacheName);
AffinityKeyGenerator node1KeyGen = new AffinityKeyGenerator(ignite(1), cacheName);
AffinityKeyGenerator node2KeyGen = new AffinityKeyGenerator(ignite(2), cacheName);
try {
awaitPartitionMapExchange();
cache.put(node0KeyGen.next(), new Value(1, 3));
cache.put(node0KeyGen.next(), new Value(1, 3));
cache.put(node0KeyGen.next(), new Value(1, 3));
cache.put(node1KeyGen.next(), new Value(2, 1));
cache.put(node1KeyGen.next(), new Value(2, 2));
cache.put(node1KeyGen.next(), new Value(2, 3));
cache.put(node2KeyGen.next(), new Value(3, 1));
cache.put(node2KeyGen.next(), new Value(3, 1));
cache.put(node2KeyGen.next(), new Value(3, 2));
cache.put(node2KeyGen.next(), new Value(3, 1));
cache.put(node2KeyGen.next(), new Value(3, 2));
cache.put(node2KeyGen.next(), new Value(3, 2));
cache.put(node0KeyGen.next(), new Value(4, 2));
cache.put(node1KeyGen.next(), new Value(5, 2));
cache.put(node2KeyGen.next(), new Value(6, 2));
checkQueryWithGroupsAndAggrs(cache);
checkQueryWithGroupsAndDistinctAggr(cache);
checkQueryWithGroupsAndAggrMixed(cache);
}
finally {
cache.destroy();
}
}
/**
* Check results of aggregate functions if no rows are selected.
*
* @throws Exception If failed,
*/
@Test
public void testEmptyCacheAggregates() throws Exception {
final String cacheName = "ints";
IgniteCache<Integer, Value> cache = ignite(0).getOrCreateCache(cacheConfig(cacheName, true,
Integer.class, Value.class));
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count", 0L, ((Number)row.get(0)).longValue());
assertEquals("sum", null, row.get(1));
assertEquals("avg", null, row.get(2));
assertEquals("min", null, row.get(3));
assertEquals("max", null, row.get(4));
}
finally {
cache.destroy();
}
}
/**
* Check avg() with various data types.
*
* @throws Exception If failed.
*/
@Test
public void testAvgVariousDataTypes() throws Exception {
final String cacheName = "avgtypes";
IgniteCache<Integer, AvgDataTypes> cache =
ignite(0).getOrCreateCache(cacheConfig(cacheName, true, Integer.class, AvgDataTypes.class));
// avg 13.125; int avg 13
double value[] = new double[] {1, 5, 7, 8, 10.5, 13.5, 20, 40};
for (int i = 0; i < value.length; i++) {
Number v = value[i];
cache.put(i, new AvgDataTypes(
v.byteValue(),
v.shortValue(),
v.intValue(),
v.longValue(),
new BigDecimal(v.toString()),
v.floatValue(),
v.doubleValue()));
}
try {
checkAvgWithVariousTypes(cache, false);
checkAvgWithVariousTypes(cache, true);
}
finally {
cache.destroy();
}
}
/**
* Check avg() with various data types.
*
* @param cache Cache.
* @param distinct Distinct flag.
*/
private void checkAvgWithVariousTypes(IgniteCache<Integer, AvgDataTypes> cache, boolean distinct) {
String qryText = String.format("select avg(%1$s byteField), avg(%1$s shortField), " +
"avg(%1$s intField), avg(%1$s longField), avg(%1$s decimalField), " +
"avg(%1$s floatField), avg(%1$s doubleField) from AvgDataTypes", distinct ? "distinct" : "");
SqlFieldsQuery qry = new SqlFieldsQuery(qryText);
List<List<?>> result = cache.query(qry).getAll();
List<?> row = result.get(0);
assertEquals((byte)13, row.get(0));
assertEquals((short)13, row.get(1));
assertEquals(13, row.get(2));
assertEquals(13L, row.get(3));
assertEquals(new BigDecimal("13.125"), row.get(4));
assertEquals(13.125f, row.get(5));
assertEquals(13.125d, row.get(6));
}
/** Simple query with aggregates */
private void checkSimpleQueryWithAggr(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd) FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count", 15L, ((Number)row.get(0)).longValue());
assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
assertEquals("avg", 2, ((Integer)row.get(2)).intValue());
assertEquals("min", 1, ((Integer)row.get(3)).intValue());
assertEquals("max", 3, ((Integer)row.get(4)).intValue());
}
}
/** Simple query with distinct aggregates */
private void checkSimpleQueryWithDistinctAggr(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct snd), max(distinct snd) " +
"FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count distinct", 6L, ((Number)row.get(0)).longValue());
assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
assertEquals("avg distinct", 2, ((Integer)row.get(2)).intValue());
assertEquals("min distinct", 1, ((Integer)row.get(3)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(4)).intValue());
}
}
/** Simple query with distinct aggregates */
private void checkSimpleQueryWithAggrMixed(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(fst), sum(snd), avg(snd), min(snd), max(snd)," +
"count(distinct fst), sum(distinct snd), avg(distinct snd), min(distinct snd), max(distinct snd) " +
"FROM Value"))) {
List<List<?>> result = qry.getAll();
assertEquals(1, result.size());
List<?> row = result.get(0);
assertEquals("count", 15L, ((Number)row.get(0)).longValue());
assertEquals("sum", 30L, ((Number)row.get(1)).longValue());
assertEquals("avg", 2, ((Integer)row.get(2)).intValue());
assertEquals("min", 1, ((Integer)row.get(3)).intValue());
assertEquals("max", 3, ((Integer)row.get(4)).intValue());
assertEquals("count distinct", 6L, ((Number)row.get(5)).longValue());
assertEquals("sum distinct", 6L, ((Number)row.get(6)).longValue());
assertEquals("avg distinct", 2, ((Integer)row.get(7)).intValue());
assertEquals("min distinct", 1, ((Integer)row.get(8)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(9)).intValue());
}
}
/** Query with aggregates and groups */
private void checkQueryWithGroupsAndAggrs(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT fst, count(snd), sum(snd), avg(snd), avg(CAST(snd AS DOUBLE)), " +
"min(snd), max(snd) FROM Value GROUP BY fst ORDER BY fst"))) {
List<List<?>> result = qry.getAll();
assertEquals(6, result.size());
List<?> row = result.get(0);
assertEquals("fst", 1, ((Number)row.get(0)).intValue());
assertEquals("count", 3L, ((Number)row.get(1)).longValue());
assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
assertEquals("avg", 3, ((Number)row.get(3)).doubleValue(), 0.001);
assertEquals("avg dbl", 3d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 3, ((Integer)row.get(5)).intValue());
assertEquals("max", 3, ((Integer)row.get(6)).intValue());
row = result.get(1);
assertEquals("fst", 2, ((Number)row.get(0)).intValue());
assertEquals("count", 3L, ((Number)row.get(1)).longValue());
assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
assertEquals("avg", 2, ((Number)row.get(3)).doubleValue(), 0.001);
assertEquals("avg dbl", 2d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 1, ((Integer)row.get(5)).intValue());
assertEquals("max", 3, ((Integer)row.get(6)).intValue());
row = result.get(2);
assertEquals("fst", 3, ((Number)row.get(0)).intValue());
assertEquals("count", 6L, ((Number)row.get(1)).longValue());
assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
assertEquals("avg", 1, ((Integer)row.get(3)).intValue());
assertEquals("avg dbl", 1.5d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 1, ((Integer)row.get(5)).intValue());
assertEquals("max", 2, ((Integer)row.get(6)).intValue());
}
}
/** Query with distinct aggregates and groups */
private void checkQueryWithGroupsAndDistinctAggr(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT count(distinct snd), sum(distinct snd), avg(distinct snd), " +
"avg(distinct cast(snd as double)), min(distinct snd), max(distinct snd) " +
"FROM Value GROUP BY fst"))) {
List<List<?>> result = qry.getAll();
assertEquals(6, result.size());
List<?> row = result.get(0);
assertEquals("count distinct", 1L, ((Number)row.get(0)).longValue());
assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
assertEquals("avg distinct", 3, ((Integer)row.get(2)).intValue());
assertEquals("avg distinct dbl", 3.0d, ((Number)row.get(3)).doubleValue(), 0.001);
assertEquals("min distinct", 3, ((Integer)row.get(4)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(5)).intValue());
row = result.get(1);
assertEquals("count distinct", 3L, ((Number)row.get(0)).longValue());
assertEquals("sum distinct", 6L, ((Number)row.get(1)).longValue());
assertEquals("avg distinct", 2, ((Integer)row.get(2)).intValue());
assertEquals("avg distinct dbl", 2.0d, ((Number)row.get(3)).doubleValue(), 0.001);
assertEquals("min distinct", 1, ((Integer)row.get(4)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(5)).intValue());
row = result.get(2);
assertEquals("count distinct", 2L, ((Number)row.get(0)).longValue());
assertEquals("sum distinct", 3L, ((Number)row.get(1)).longValue());
assertEquals("avg distinct", 1, ((Integer)row.get(2)).intValue());
assertEquals("avg distinct dbl", 1.5d, ((Number)row.get(3)).doubleValue(), 0.001);
assertEquals("min distinct", 1, ((Integer)row.get(4)).intValue());
assertEquals("max distinct", 2, ((Integer)row.get(5)).intValue());
}
}
/** Query with distinct aggregates and groups */
private void checkQueryWithGroupsAndAggrMixed(IgniteCache<Integer, Value> cache) {
try (QueryCursor<List<?>> qry = cache.query(new SqlFieldsQuery(
"SELECT fst, count(snd), sum(snd), avg(snd), avg(cast(snd as double)), min(snd), max(snd)," +
"count(distinct snd), sum(distinct snd), avg(distinct snd), avg(distinct cast(snd as double)), " +
"min(distinct snd), max(distinct snd) FROM Value GROUP BY fst"))) {
List<List<?>> result = qry.getAll();
assertEquals(6, result.size());
List<?> row = result.get(0);
assertEquals("fst", 1, ((Number)row.get(0)).intValue());
assertEquals("count", 3L, ((Number)row.get(1)).longValue());
assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
assertEquals("avg", 3, ((Integer)row.get(3)).intValue());
assertEquals("avg dbl", 3.0d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 3, ((Integer)row.get(5)).intValue());
assertEquals("max", 3, ((Integer)row.get(6)).intValue());
assertEquals("count distinct", 1L, ((Number)row.get(7)).longValue());
assertEquals("sum distinct", 3L, ((Number)row.get(8)).longValue());
assertEquals("avg distinct", 3, ((Integer)row.get(9)).intValue());
assertEquals("avg distinct dbl", 3.0d, ((Number)row.get(10)).doubleValue(), 0.001);
assertEquals("min distinct", 3, ((Integer)row.get(11)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(12)).intValue());
row = result.get(1);
assertEquals("fst", 2, ((Number)row.get(0)).intValue());
assertEquals("count", 3L, ((Number)row.get(1)).longValue());
assertEquals("sum", 6L, ((Number)row.get(2)).longValue());
assertEquals("avg", 2, ((Integer)row.get(3)).intValue());
assertEquals("avg dbl", 2.0d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 1, ((Integer)row.get(5)).intValue());
assertEquals("max", 3, ((Integer)row.get(6)).intValue());
assertEquals("count distinct", 3L, ((Number)row.get(7)).longValue());
assertEquals("sum distinct", 6L, ((Number)row.get(8)).longValue());
assertEquals("avg distinct", 2, ((Integer)row.get(9)).intValue());
assertEquals("avg distinct dbl", 2.0d, ((Number)row.get(10)).doubleValue(), 0.001);
assertEquals("min distinct", 1, ((Integer)row.get(11)).intValue());
assertEquals("max distinct", 3, ((Integer)row.get(12)).intValue());
row = result.get(2);
assertEquals("fst", 3, ((Number)row.get(0)).intValue());
assertEquals("count", 6L, ((Number)row.get(1)).longValue());
assertEquals("sum", 9L, ((Number)row.get(2)).longValue());
assertEquals("avg", 1, ((Integer)row.get(3)).intValue());
assertEquals("avg dbl", 1.5d, ((Number)row.get(4)).doubleValue(), 0.001);
assertEquals("min", 1, ((Integer)row.get(5)).intValue());
assertEquals("max", 2, ((Integer)row.get(6)).intValue());
assertEquals("count distinct", 2L, ((Number)row.get(7)).longValue());
assertEquals("sum distinct", 3L, ((Number)row.get(8)).longValue());
assertEquals("avg distinct", 1, ((Integer)row.get(9)).intValue());
assertEquals("avg distinct dbl", 1.5d, ((Number)row.get(10)).doubleValue(), 0.001);
assertEquals("min distinct", 1, ((Integer)row.get(11)).intValue());
assertEquals("max distinct", 2, ((Integer)row.get(12)).intValue());
}
}
/** */
private static class Value {
/** */
@QuerySqlField
private final Integer fst;
/** */
@QuerySqlField
private final Integer snd;
/** Constructor */
public Value(Integer fst, Integer snd) {
this.fst = fst;
this.snd = snd;
}
}
/**
*
*/
private static class AffinityKeyGenerator {
/** */
private final Affinity<Integer> affinity;
/** */
private final ClusterNode node;
/** */
private int start = 0;
/** Constructor */
AffinityKeyGenerator(Ignite node, String cacheName) {
this.affinity = node.affinity(cacheName);
this.node = node.cluster().localNode();
}
/** */
public Integer next() {
int key = start;
while (start < Integer.MAX_VALUE) {
if (affinity.isPrimary(node, key)) {
start = key + 1;
return key;
}
key++;
}
throw new IllegalStateException("Can't find next key");
}
}
/**
*
*/
public static class Person {
/** */
@QuerySqlField
private int id;
/** */
@QuerySqlField
private String name;
/** */
@QuerySqlField
private int depId;
}
/**
*
*/
public static class Org {
/** */
@QuerySqlField(index = true)
private int id;
/** */
@QuerySqlField
private String name;
}
/**
*
*/
public static class Department {
/** */
@QuerySqlField(index = true)
private int id;
/** */
@QuerySqlField(index = true)
private int orgId;
/** */
@QuerySqlField
private String name;
}
/**
* Test value.
*/
private static class GroupIndexTestValue implements Serializable {
/** */
@QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 0))
private int a;
/** */
@QuerySqlField(orderedGroups = @QuerySqlField.Group(name = "grpIdx", order = 1))
private int b;
/**
* @param a A.
* @param b B.
*/
private GroupIndexTestValue(int a, int b) {
this.a = a;
this.b = b;
}
}
/**
*
*/
private static class Person2 implements Serializable {
/** */
@QuerySqlField(index = true)
int orgId;
/** */
@QuerySqlField(index = true)
String name;
/**
*
*/
public Person2() {
// No-op.
}
/**
* @param orgId Organization ID.
* @param name Name.
*/
public Person2(int orgId, String name) {
this.orgId = orgId;
this.name = name;
}
}
/**
*
*/
private static class TestKey implements Serializable {
/** */
@QuerySqlField(index = true)
@AffinityKeyMapped
int affKey;
/** */
@QuerySqlField()
int id;
/**
* @param affKey Affinity key.
* @param id ID.
*/
public TestKey(int affKey, int id) {
this.affKey = affKey;
this.id = id;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestKey personKey = (TestKey)o;
return id == personKey.id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return id;
}
}
/**
*
*/
private static class Organization implements Serializable {
/** */
@QuerySqlField
String name;
/**
*
*/
public Organization() {
// No-op.
}
/**
* @param name Organization name.
*/
public Organization(String name) {
this.name = name;
}
}
/**
*
*/
private static class User implements Serializable {
/** */
@QuerySqlField
private int id;
}
/**
*
*/
private static class UserOrder implements Serializable {
/** */
@QuerySqlField
private int id;
/** */
@QuerySqlField
private int userId;
}
/**
*
*/
private static class OrderGood implements Serializable {
/** */
@QuerySqlField
private int orderId;
/** */
@QuerySqlField
private int goodId;
}
/** */
private static class Contract implements Serializable {
/** */
@QuerySqlField(index = true)
private final int CO_ID;
/** */
@QuerySqlField(index = true)
private final int CUSTOMER_ID;
/** */
public Contract(final int CO_ID, final int CUSTOMER_ID) {
this.CO_ID = CO_ID;
this.CUSTOMER_ID = CUSTOMER_ID;
}
}
/** */
public class PromoContract implements Serializable {
/** */
@QuerySqlField(index = true, orderedGroups = {
@QuerySqlField.Group(name = "myIdx", order = 1)})
private final int CO_ID;
/** */
@QuerySqlField(index = true, orderedGroups = {
@QuerySqlField.Group(name = "myIdx", order = 0)})
private final int OFFER_ID;
/** */
public PromoContract(final int co_Id, final int offer_Id) {
this.CO_ID = co_Id;
this.OFFER_ID = offer_Id;
}
}
/** */
public class AvgDataTypes {
/** */
@QuerySqlField
private Byte byteField;
/** */
@QuerySqlField
private Short shortField;
/** */
@QuerySqlField
private Integer intField;
/** */
@QuerySqlField
private Long longField;
/** */
@QuerySqlField
private BigDecimal decimalField;
/** */
@QuerySqlField
private Float floatField;
/** */
@QuerySqlField
private Double doubleField;
/** */
public AvgDataTypes(Byte byteField, Short shortField, Integer intField, Long longField,
BigDecimal decimalField, Float floatField, Double doubleField) {
this.byteField = byteField;
this.shortField = shortField;
this.intField = intField;
this.longField = longField;
this.decimalField = decimalField;
this.floatField = floatField;
this.doubleField = doubleField;
}
}
}