blob: 0a7ab8d70355d9f750c4142d3a383513c1e70818 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.sqltests;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.cache.Cache;
import javax.cache.CacheException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
* Test base for test for sql features.
*/
public class BaseSqlTest extends AbstractIndexingCommonTest {
/** Number of all employees. */
public static final long EMP_CNT = 1000L;
/** Number of all departments. */
public static final long DEP_CNT = 50L;
/** Number of all addresses. */
public static final long ADDR_CNT = 500L;
/** Number of employees that aren't associated with any department. */
public static final long FREE_EMP_CNT = 50;
/** Number of departments that don't have employees and addresses. */
public static final long FREE_DEP_CNT = 5;
/** Number of adderesses that are not associated with any departments. */
public static final long FREE_ADDR_CNT = 30;
/** Number of possible age values (width of ages values range). */
public static final int AGES_CNT = 50;
/** Name of client node. */
public static final String CLIENT_NODE_NAME = "clientNode";
/** Name of the Employee table cache. */
public static final String EMP_CACHE_NAME = "SQL_PUBLIC_EMPLOYEE";
/** Name of the Department table cache. */
public static final String DEP_CACHE_NAME = "SQL_PUBLIC_DEPARTMENT";
/** Name of the Address table cache. */
public static final String ADDR_CACHE_NAME = "SQL_PUBLIC_ADDRESS";
/** Client node instance. */
protected static IgniteEx client;
/** Node name of second server. */
public final String SRV2_NAME = "server2";
/** Node name of first server. */
public final String SRV1_NAME = "server1";
/** */
public static final String[] ALL_EMP_FIELDS = new String[] {"ID", "DEPID", "DEPIDNOIDX", "FIRSTNAME", "LASTNAME", "AGE", "SALARY"};
/** Flag that forces to do explain query in log before performing actual query. */
public static boolean explain = false;
/** Department table name. */
protected String DEP_TAB = "Department";
/** Random for generator. */
private Random rnd = new Random();
/**
* Makes configuration for client node.
*/
private IgniteConfiguration clientConfiguration() throws Exception {
IgniteConfiguration clCfg = getConfiguration(CLIENT_NODE_NAME);
clCfg.setClientMode(true);
return optimize(clCfg);
}
/**
* Fills tables with data.
*/
protected void fillCommonData() {
SqlFieldsQuery insEmp = new SqlFieldsQuery("INSERT INTO Employee VALUES (?, ?, ?, ?, ?, ?, ?)");
SqlFieldsQuery insConf = new SqlFieldsQuery("INSERT INTO Address VALUES (?, ?, ?, ?)");
fillDepartmentTable("Department");
for (long id = 0; id < EMP_CNT; id++) {
Long depId = (long)rnd.nextInt((int)(DEP_CNT - FREE_DEP_CNT));
if (id < FREE_EMP_CNT)
depId = null;
String firstName = UUID.randomUUID().toString();
String lastName = UUID.randomUUID().toString();
Integer age = rnd.nextInt(AGES_CNT) + 18;
Integer salary = rnd.nextInt(50) + 50;
execute(insEmp.setArgs(id, depId, depId, firstName, lastName, age, salary));
}
for (long addrId = 0; addrId < ADDR_CNT; addrId++) {
Long depId = (long)rnd.nextInt((int)(DEP_CNT - FREE_DEP_CNT));
if (addrId < FREE_ADDR_CNT)
depId = null;
String address = UUID.randomUUID().toString();
execute(insConf.setArgs(addrId, depId, depId, address));
}
}
/**
* Fills department table with test data.
*
* @param tabName name of department table.
*/
protected void fillDepartmentTable(String tabName) {
SqlFieldsQuery insDep = new SqlFieldsQuery("INSERT INTO " + tabName + " VALUES (?, ?, ?)");
for (long id = 0; id < DEP_CNT; id++) {
String name = UUID.randomUUID().toString();
execute(insDep.setArgs(id, id, name));
}
}
/**
* Creates common tables.
*
* @param commonParams Common parameters for the with clause (of CREATE TABLE), such as "template=partitioned".
*/
protected final void createTables(String commonParams) {
createEmployeeTable(commonParams);
createDepartmentTable(DEP_TAB, commonParams);
createAddressTable(commonParams);
}
/**
* Creates Address test table.
*
* @param commonParams Common params.
*/
protected void createAddressTable(String commonParams) {
execute("CREATE TABLE Address (" +
"id LONG PRIMARY KEY, " +
"depId LONG, " +
"depIdNoidx LONG, " +
"address VARCHAR" +
")" +
(F.isEmpty(commonParams) ? "" : " WITH \"" + commonParams + "\"") +
";");
execute("CREATE INDEX depIndex ON Address (depId)");
}
/**
* Creates table for department entity with specified table name.
*
* @param depTabName Department tab name.
* @param commonParams Common params.
*/
protected void createDepartmentTable(String depTabName, String commonParams) {
execute("CREATE TABLE " + depTabName + " (" +
"id LONG PRIMARY KEY," +
"idNoidx LONG, " +
"name VARCHAR" +
") " +
(F.isEmpty(commonParams) ? "" : " WITH \"" + commonParams + "\"") +
";");
}
/**
* Creates Employee test table.
*
* @param commonParams Common params.
*/
protected void createEmployeeTable(String commonParams) {
execute("CREATE TABLE Employee (" +
"id LONG, " +
"depId LONG, " +
"depIdNoidx LONG," +
"firstName VARCHAR, " +
"lastName VARCHAR, " +
"age INT, " +
"salary INT, " +
"PRIMARY KEY (id, depId)" +
") " +
"WITH \"affinity_key=depId" + (F.isEmpty(commonParams) ? "" : ", " + commonParams) + "\"" +
";");
execute("CREATE INDEX AgeIndex ON Employee (age)");
}
/**
* Sets up data. Override in children to add/change behaviour.
*/
protected void setupData() {
createTables(""); // default.
fillCommonData();
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrid(SRV1_NAME, getConfiguration(SRV1_NAME), null);
startGrid(SRV2_NAME, getConfiguration(SRV2_NAME), null);
client = (IgniteEx)startGrid(CLIENT_NODE_NAME, clientConfiguration(), null);
boolean locExp = explain;
explain = false;
setupData();
explain = locExp;
}
/**
* Result of sql query. Contains metadata and all values in memory.
*/
static class Result {
/** Names of columns. */
private List<String> colNames;
/** Table */
private List<List<?>> vals;
/** */
public Result(List<String> colNames, List<List<?>> vals) {
this.colNames = colNames;
this.vals = vals;
}
/**
* @return metadata - name of columns.
*/
public List<String> columnNames() {
return colNames;
}
/**
* @return table, the actual data.
*/
public List<List<?>> values() {
return vals;
}
/**
* Creates result from cursor.
*
* @param cursor cursor to use to read column names and data.
* @return Result that contains data and metadata, fetched from cursor.
*/
public static Result fromCursor(FieldsQueryCursor<List<?>> cursor) {
List<String> cols = readColNames(cursor);
List<List<?>> vals = cursor.getAll();
return new Result(cols, vals);
}
}
/**
* Assert that results are sorted by comparator.
*
* @param vals values to check.
* @param cmp comparator to use.
* @param <T> any type.
*/
protected <T> void assertSortedBy(List<T> vals, Comparator<T> cmp) {
Iterator<T> it = vals.iterator();
if (!it.hasNext())
return;
T last = it.next();
while (it.hasNext()) {
T cur = it.next();
if (cmp.compare(last, cur) > 0)
throw new AssertionError("List is not sorted, element '" + last + "' is greater than '" +
cur + "'. List: " + vals);
}
}
/**
* Read colon names from cursor.
*
* @param cursor source of metadata.
* @return List containing colon names.
*/
private static List<String> readColNames(FieldsQueryCursor<?> cursor) {
ArrayList<String> colNames = new ArrayList<>();
for (int i = 0; i < cursor.getColumnsCount(); i++)
colNames.add(cursor.getFieldName(i));
return Collections.unmodifiableList(colNames);
}
/**
* Shortcut for {@link #executeFrom(SqlFieldsQuery, Ignite)}, that has String argument.
*/
protected Result executeFrom(String qry, Ignite node) {
return executeFrom(new SqlFieldsQuery(qry), node);
}
/**
* Shortcut for {@link #execute(SqlFieldsQuery)}.
*
* @param qry query string.
* @return number of changed rows.
*/
protected Result execute(String qry) {
return executeFrom(new SqlFieldsQuery(qry), client);
}
/**
* Performs query from client node.
*
* @param qry query.
* @return number of changed rows.
*/
protected Result execute(SqlFieldsQuery qry) {
return executeFrom(qry, client);
}
/**
* Execute query from node.
*
* @param qry query.
* @param node node to use to perform query.
* @return Result of query.
*/
protected final Result executeFrom(SqlFieldsQuery qry, Ignite node) {
if (explain) {
try {
SqlFieldsQuery explainQry = new SqlFieldsQuery(qry).setSql("EXPLAIN " + qry.getSql());
List<List<?>> res = ((IgniteEx)node).context().query().querySqlFields(explainQry, false).getAll();
String explanation = (String)res.get(0).get(0);
log.debug("Node: " + node.name() + ": Execution plan for query " + qry + ":\n" + explanation);
}
catch (Exception exc) {
log.error("Ignoring exception gotten explaining query : " + qry, exc);
}
}
FieldsQueryCursor<List<?>> cursor = ((IgniteEx)node).context().query().querySqlFields(qry, false);
return Result.fromCursor(cursor);
}
/**
* Assert that collections contain the equal elements.
*/
protected void assertContainsEq(Collection actual, Collection expected) {
assertContainsEq(null, actual, expected);
}
/**
* Assert that collections contain the equal elements ({@link Object#equals(Object)}), ignoring the order.
*
* @param msg message to add if assert fails.
* @param actual collection.
* @param expected collection.
*/
protected void assertContainsEq(String msg, Collection<?> actual, Collection<?> expected) {
if (F.isEmpty(msg))
msg = "Assertion failed.";
boolean eq = actual.size() == expected.size() && actual.containsAll(expected);
if (!eq) {
StringBuilder errMsg = new StringBuilder(msg + "\n");
errMsg.append("\texpectedSize=").append(expected.size()).append("\n");
errMsg.append("\tactualSize= ").append(actual.size()).append("\n");
Collection<?> expectedOnly = removeFromCopy(expected, actual);
Collection<?> actualOnly = removeFromCopy(actual, expected);
if (!expectedOnly.isEmpty()) {
errMsg.append("\texpectedOnly={\n");
for (Object row : expectedOnly)
errMsg.append("\t\t").append(row).append("\n");
errMsg.append("\t}\n");
}
if (!actualOnly.isEmpty()) {
errMsg.append("\tactualOnly={\n");
for (Object row : actualOnly)
errMsg.append("\t\t").append(row).append("\n");
errMsg.append("\t}\n");
}
throw new AssertionError(errMsg.toString());
}
if (actual.size() != expected.size())
throw new AssertionError(msg + " Collections contain different number of elements:" +
" [actual=" + actual + ", expected=" + expected + "].\n" +
"[uniqActual=]" + removeFromCopy(actual, expected) +
", uniqExpected=" + removeFromCopy(expected, actual) + "]");
if (!actual.containsAll(expected))
throw new AssertionError(msg + " Collections differ:" +
" [actual=" + actual + ", expected=" + expected + "].\n" +
"[uniqActual=]" + removeFromCopy(actual, expected) +
", uniqExpected=" + removeFromCopy(expected, actual) + "]");
}
/**
* Subtracts from the copy of one collection another collection.
* Number of "from" collection duplicates that will be removed, is equal to number of
* duplicates in "toRemove" collection.
*
* @param from Collection which copy is left argument of subtraction.
* @param toRemove Right argument of subtraction.
*/
private static Collection removeFromCopy(Collection<?> from, Collection<?> toRemove) {
List<?> fromCp = new ArrayList<>(from);
for (Object item : toRemove)
fromCp.remove(item);
return fromCp;
}
/**
* Performs scan query with fields projection.
*
* @param cache cache to query.
* @param filter filter for rows.
* @param fields to use in result (projection).
*/
protected static <K, V> List<List<Object>> select(
IgniteCache<K, V> cache,
@Nullable IgnitePredicate<Map<String, Object>> filter,
String... fields) {
IgniteClosure<Map<String, Object>, List<Object>> fieldsExtractor = row -> {
List<Object> res = new ArrayList<>();
for (String field : fields) {
String normField = field.toUpperCase();
if (!row.containsKey(normField)) {
throw new RuntimeException("Field with name " + normField +
" not found in the table. Avaliable fields: " + row.keySet());
}
Object val = row.get(normField);
res.add(val);
}
return res;
};
return select(cache, filter, fieldsExtractor);
}
/**
* Performs scan query with custom transformer (mapper).
*
* @param cache cache to query.
* @param filter filter for rows.
* @param transformer result mapper.
*/
@SuppressWarnings("unchecked")
protected static <K, V, R> List<R> select(
IgniteCache<K, V> cache,
@Nullable IgnitePredicate<Map<String, Object>> filter,
IgniteClosure<Map<String, Object>, R> transformer) {
Collection<QueryEntity> entities = cache.getConfiguration(CacheConfiguration.class).getQueryEntities();
assert entities.size() == 1 : "Cache should contain exactly one table";
final QueryEntity meta = entities.iterator().next();
IgniteClosure<Cache.Entry<K, V>, R> transformerAdapter = entry -> {
Map<String, Object> row = entryToMap(meta, entry.getKey(), entry.getValue());
return transformer.apply(row);
};
IgniteBiPredicate<K, V> filterAdapter = (filter == null) ? null :
(key, val) -> filter.apply(entryToMap(meta, key, val));
QueryCursor<R> cursor = cache.withKeepBinary()
.query(new ScanQuery<>(filterAdapter), transformerAdapter);
return cursor.getAll();
}
/**
* Transforms cache entry to map (column name -> value).
*
* @param meta Meta information (QueryEntity) about table.
* @param key Key of the cache entry.
* @param val Value of the cache entry.
*/
private static Map<String, Object> entryToMap(QueryEntity meta, Object key, Object val) {
Map<String, Object> row = new LinkedHashMap<>();
// Look up for the field in the key
if (key instanceof BinaryObject) {
BinaryObject compositeKey = (BinaryObject)key;
for (String field : compositeKey.type().fieldNames())
row.put(field, compositeKey.field(field));
}
else
row.put(meta.getKeyFieldName(), key);
// And in the value.
if (val instanceof BinaryObject) {
BinaryObject compositeVal = (BinaryObject)val;
for (String field : compositeVal.type().fieldNames())
row.put(field, compositeVal.field(field));
}
else
row.put(meta.getValueFieldName(), val);
return row;
}
/**
* Make collection to be distinct - put all in Set.
*
* @param src collection.
* @return Set with elements from {@code src} collection.
*/
public static Set<Object> distinct(Collection<?> src) {
return new HashSet<>(src);
}
/**
* Applies specified closure to each cluster node.
*/
protected void testAllNodes(Consumer<Ignite> consumer) {
for (Ignite node : Ignition.allGrids()) {
log.info("Testing on node " + node.name() + '.');
consumer.accept(node);
log.info("Testing on node " + node.name() + " is done.");
}
}
/**
* Check basic SELECT * query.
*/
@Test
public void testBasicSelect() {
testAllNodes(node -> {
Result emps = executeFrom("SELECT * FROM Employee", node);
assertContainsEq("SELECT * returned unexpected column names.", emps.columnNames(), Arrays.asList(ALL_EMP_FIELDS));
List<List<Object>> expEmps = select(node.cache(EMP_CACHE_NAME), null, emps.columnNames().toArray(new String[0]));
assertContainsEq(emps.values(), expEmps);
});
}
/**
* Check SELECT query with projection (fields).
*/
@Test
public void testSelectFields() {
testAllNodes(node -> {
Result res = executeFrom("SELECT firstName, id, age FROM Employee;", node);
String[] fields = {"FIRSTNAME", "ID", "AGE"};
assertEquals("Returned column names are incorrect.", res.columnNames(), Arrays.asList(fields));
List<List<Object>> expected = select(node.cache(EMP_CACHE_NAME), null, fields);
assertContainsEq(res.values(), expected);
});
}
/**
* Check basic BETWEEN operator usage.
*/
@Test
public void testSelectBetween() {
testAllNodes(node -> {
Result emps = executeFrom("SELECT * FROM Employee e WHERE e.id BETWEEN 101 and 200", node);
assertEquals("Fetched number of employees is incorrect", 100, emps.values().size());
String[] fields = emps.columnNames().toArray(new String[0]);
assertContainsEq("SELECT * returned unexpected column names.", emps.columnNames(), Arrays.asList(ALL_EMP_FIELDS));
IgnitePredicate<Map<String, Object>> between = row -> {
long id = (Long)row.get("ID");
return 101 <= id && id <= 200;
};
List<List<Object>> expected = select(node.cache(EMP_CACHE_NAME), between, fields);
assertContainsEq(emps.values(), expected);
});
}
/**
* Check BETWEEN operator filters out all the result (empty result set is expected).
*/
@Test
public void testEmptyBetween() {
testAllNodes(node -> {
Result emps = executeFrom("SELECT * FROM Employee e WHERE e.id BETWEEN 200 AND 101", node);
assertTrue("SQL should have returned empty result set, but it have returned: " + emps, emps.values().isEmpty());
});
}
/**
* Check SELECT IN with fixed values.
*/
@Test
public void testSelectInStatic() {
testAllNodes(node -> {
Result actual = executeFrom("SELECT age FROM Employee WHERE id IN (1, 256, 42)", node);
List<List<Object>> expected = select(node.cache(EMP_CACHE_NAME),
row -> {
Object id = row.get("ID");
return F.eq(id, 1L) || F.eq(id, 256L) || F.eq(id, 42L);
},
"AGE");
assertContainsEq(actual.values(), expected);
});
}
/**
* Check SELECT IN with simple subquery values.
*/
@Test
public void testSelectInSubquery() {
testAllNodes(node -> {
Result actual = executeFrom("SELECT lastName FROM Employee WHERE id in (SELECT id FROM Employee WHERE age < 30)", node);
List<List<Object>> expected = select(node.cache(EMP_CACHE_NAME), row -> (Integer)row.get("AGE") < 30, "lastName");
assertContainsEq(actual.values(), expected);
});
}
/**
* Check ORDER BY operator with varchar field.
*/
@Test
public void testBasicOrderByLastName() {
testAllNodes(node -> {
Result result = executeFrom("SELECT * FROM Employee e ORDER BY e.lastName", node);
List<List<Object>> exp = select(node.cache(EMP_CACHE_NAME), null, result.columnNames().toArray(new String[0]));
assertContainsEq(result.values(), exp);
int lastNameIdx = result.columnNames().indexOf("LASTNAME");
Comparator<List<?>> asc = Comparator.comparing((List<?> row) -> (String)row.get(lastNameIdx));
assertSortedBy(result.values(), asc);
});
}
/**
* Check DISTINCT operator selecting not unique field.
*/
@Test
public void testBasicDistinct() {
testAllNodes(node -> {
Result ages = executeFrom("SELECT DISTINCT age FROM Employee", node);
Set<Object> expected = distinct(select(node.cache(EMP_CACHE_NAME), null, "age"));
assertContainsEq("Values in cache differ from values returned from sql.", ages.values(), expected);
});
}
/**
* Check simple WHERE operator.
*/
@Test
public void testDistinctWithWhere() {
testAllNodes(node -> {
Result ages = executeFrom("SELECT DISTINCT age FROM Employee WHERE id < 100", node);
Set<Object> expAges = distinct(select(node.cache(EMP_CACHE_NAME), row -> (Long)row.get("ID") < 100, "age"));
assertContainsEq(ages.values(), expAges);
});
}
/**
* Check greater operator in where clause with both indexed and non-indexed field.
*/
@Test
public void testWhereGreater() {
testAllNodes(node -> {
Result idxActual = executeFrom("SELECT firstName FROM Employee WHERE age > 30", node);
Result noidxActual = executeFrom("SELECT firstName FROM Employee WHERE salary > 75", node);
IgniteCache<Object, Object> cache = node.cache(EMP_CACHE_NAME);
List<List<Object>> idxExp = select(cache, row -> (Integer)row.get("AGE") > 30, "firstName");
List<List<Object>> noidxExp = select(cache, row -> (Integer)row.get("SALARY") > 75, "firstName");
assertContainsEq(idxActual.values(), idxExp);
assertContainsEq(noidxActual.values(), noidxExp);
});
}
/**
* Check less operator in where clause with both indexed and non-indexed field.
*/
@Test
public void testWhereLess() {
testAllNodes(node -> {
Result idxActual = executeFrom("SELECT firstName FROM Employee WHERE age < 30", node);
Result noidxActual = executeFrom("SELECT firstName FROM Employee WHERE salary < 75", node);
IgniteCache<Object, Object> cache = node.cache(EMP_CACHE_NAME);
List<List<Object>> idxExp = select(cache, row -> (Integer)row.get("AGE") < 30, "firstName");
List<List<Object>> noidxExp = select(cache, row -> (Integer)row.get("SALARY") < 75, "firstName");
assertContainsEq(idxActual.values(), idxExp);
assertContainsEq(noidxActual.values(), noidxExp);
});
}
/**
* Check equals operator in where clause with both indexed and non-indexed field.
*/
@Test
public void testWhereEq() {
testAllNodes(node -> {
Result idxActual = executeFrom("SELECT firstName FROM Employee WHERE age = 30", node);
Result noidxActual = executeFrom("SELECT firstName FROM Employee WHERE salary = 75", node);
IgniteCache<Object, Object> cache = node.cache(EMP_CACHE_NAME);
List<List<Object>> idxExp = select(cache, row -> (Integer)row.get("AGE") == 30, "firstName");
List<List<Object>> noidxExp = select(cache, row -> (Integer)row.get("SALARY") == 75, "firstName");
assertContainsEq(idxActual.values(), idxExp);
assertContainsEq(noidxActual.values(), noidxExp);
});
}
/**
* Check GROUP BY operator with indexed field.
*/
@Test
public void testGroupByIndexedField() {
testAllNodes(node -> {
// Need to filter out only part of records (each one is a count of employees
// of particular age) in HAVING clause.
final int avgAge = (int)(EMP_CNT / AGES_CNT);
Result result = executeFrom("SELECT age, COUNT(*) FROM Employee GROUP BY age HAVING COUNT(*) > " + avgAge, node);
List<List<Object>> all = select(node.cache(EMP_CACHE_NAME), null, "age");
Map<Integer, Long> cntGroups = new HashMap<>();
for (List<Object> entry : all) {
Integer age = (Integer)entry.get(0);
long cnt = cntGroups.getOrDefault(age, 0L);
cntGroups.put(age, cnt + 1L);
}
List<List<Object>> expected = cntGroups.entrySet().stream()
.filter(ent -> ent.getValue() > avgAge)
.map(ent -> Arrays.<Object>asList(ent.getKey(), ent.getValue()))
.collect(Collectors.toList());
assertContainsEq(result.values(), expected);
});
}
/**
* Check GROUP BY operator with indexed field.
*/
@Test
public void testGroupByNonIndexedField() {
testAllNodes(node -> {
// Need to filter out only part of records (each one is a count of employees
// associated with particular department id) in HAVING clause.
final int avgDep = (int)((EMP_CNT - FREE_EMP_CNT) / (DEP_CNT - FREE_DEP_CNT));
Result result = executeFrom(
"SELECT depId, COUNT(*) " +
"FROM Employee " +
"GROUP BY depIdNoidx " +
"HAVING COUNT(*) > " + avgDep, node);
List<List<Object>> all = select(node.cache(EMP_CACHE_NAME), null, "depId");
Map<Long, Long> cntGroups = new HashMap<>();
for (List<Object> entry : all) {
Long depId = (Long)entry.get(0);
long cnt = cntGroups.getOrDefault(depId, 0L);
cntGroups.put(depId, cnt + 1L);
}
List<List<Object>> expected = cntGroups.entrySet().stream()
.filter(ent -> ent.getValue() > avgDep)
.map(ent -> Arrays.<Object>asList(ent.getKey(), ent.getValue()))
.collect(Collectors.toList());
assertContainsEq(result.values(), expected);
});
}
/**
* Performs generic join operation of two tables.
* If either outerLeft or outerRight is true, empty map will be passed to transformer argument for rows that
* don't have matches in the other table.
*
* @param left Cache of left table.
* @param right Cache of the right table.
* @param filter Filter, corresponds to ON sql clause.
* @param transformer Transformer (mapper) to make sql projection (select fields for example).
* @param outerLeft Preserve every row from the left table even if there is no matches in the right table.
* @param outerRight Same as outerLeft for right table.
*/
public static <R> List<R> doCommonJoin(
IgniteCache<?, ?> left,
IgniteCache<?, ?> right,
IgniteBiPredicate<Map<String, Object>, Map<String, Object>> filter,
IgniteBiClosure<Map<String, Object>, Map<String, Object>, R> transformer,
boolean outerLeft,
boolean outerRight) {
List<Map<String, Object>> leftTab = select(left, null, x -> x);
List<Map<String, Object>> rightTab = select(right, null, x -> x);
final Map<String, Object> nullRow = Collections.emptyMap();
List<R> join = new ArrayList<>();
Set<Map<String, Object>> notFoundRight = Collections.newSetFromMap(new IdentityHashMap<>());
notFoundRight.addAll(rightTab);
for (Map<String, Object> lRow : leftTab) {
boolean foundLeft = false;
for (Map<String, Object> rRow : rightTab) {
if (filter.apply(lRow, rRow)) {
foundLeft = true;
notFoundRight.remove(rRow);
join.add(transformer.apply(lRow, rRow));
}
}
if (!foundLeft && outerLeft)
join.add(transformer.apply(lRow, nullRow));
}
if (outerRight) {
for (Map<String, Object> rRow : notFoundRight)
join.add(transformer.apply(nullRow, rRow));
}
return join;
}
/**
* Do right join.
*
* @param left Left cache.
* @param right Right cache.
* @param filter Filter.
* @param transformer Transformer.
* @return Result.
*/
protected static <R> List<R> doRightJoin(
IgniteCache<?, ?> left,
IgniteCache<?, ?> right,
IgniteBiPredicate<Map<String, Object>, Map<String, Object>> filter,
IgniteBiClosure<Map<String, Object>, Map<String, Object>, R> transformer) {
return doCommonJoin(left, right, filter, transformer, false, true);
}
/**
* Do left join.
*
* @param left Left cache.
* @param right Right cache.
* @param filter Filter.
* @param transformer Transformer.
* @return Result.
*/
protected static <R> List<R> doLeftJoin(
IgniteCache<?, ?> left,
IgniteCache<?, ?> right,
IgniteBiPredicate<Map<String, Object>, Map<String, Object>> filter,
IgniteBiClosure<Map<String, Object>, Map<String, Object>, R> transformer) {
return doCommonJoin(left, right, filter, transformer, true, false);
}
/**
* Do inner join.
*
* @param left Left cache.
* @param right Right cache.
* @param filter Filter.
* @param transformer Transformer.
* @return Result.
*/
protected static <R> List<R> doInnerJoin(
IgniteCache<?, ?> left,
IgniteCache<?, ?> right,
IgniteBiPredicate<Map<String, Object>, Map<String, Object>> filter,
IgniteBiClosure<Map<String, Object>, Map<String, Object>, R> transformer) {
return doCommonJoin(left, right, filter, transformer, false, false);
}
/**
* Assert that exception about incorrect index in distributed join query is thrown.
*
* @param joinCmd command that performs slq join operation.
*/
@SuppressWarnings("ThrowableNotThrown")
protected void assertDistJoinHasIncorrectIndex(Callable<?> joinCmd) {
GridTestUtils.assertThrows(log,
joinCmd,
CacheException.class,
"Failed to prepare distributed join query: join condition does not use index");
}
/**
* Verify result of inner join of employee and department tables.
*
* @param depTab name of the department table.
*/
public void checkInnerJoinEmployeeDepartment(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM Employee e INNER JOIN " + depTab + " d " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
List<List<Object>> expected = doInnerJoin(node.cache(EMP_CACHE_NAME), node.cache(cacheName(depTab)),
(emp, dep) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(emp, dep) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check INNER JOIN with collocated data.
*/
@Test
public void testInnerJoinEmployeeDepartment() {
checkInnerJoinEmployeeDepartment(DEP_TAB);
}
/**
* Check LEFT JOIN with collocated data of department and employee tables.
*
* @param depTab department table name.
*/
public void checkInnerJoinDepartmentEmployee(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM " + depTab + " d INNER JOIN Employee e " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
List<List<Object>> expected = doInnerJoin(node.cache(EMP_CACHE_NAME), node.cache(cacheName(depTab)),
(emp, dep) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(emp, dep) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check LEFT JOIN with collocated data of employee and department tables.
*
* @param depTab department table name.
*/
public void checkLeftJoinEmployeeDepartment(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM Employee e LEFT JOIN " + depTab + " d " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
List<List<Object>> expected = doLeftJoin(node.cache(EMP_CACHE_NAME), node.cache(cacheName(depTab)),
(emp, dep) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(emp, dep) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check LEFT JOIN with collocated data of department and employee tables.
*
* @param depTab department table name.
*/
public void checkLeftJoinDepartmentEmployee(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM " + depTab + " d LEFT JOIN Employee e " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
List<List<Object>> expected = doLeftJoin(node.cache(cacheName(depTab)), node.cache(EMP_CACHE_NAME),
(dep, emp) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(dep, emp) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check LEFT JOIN with collocated data.
*/
@Test
public void testLeftJoin() {
checkLeftJoinEmployeeDepartment(DEP_TAB);
}
/**
* Check RIGHT JOIN with collocated data of employee and department tables.
*
* @param depTab department table name.
*/
public void checkRightJoinEmployeeDepartment(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM Employee e RIGHT JOIN " + depTab + " d " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
List<List<Object>> expected = doRightJoin(node.cache(EMP_CACHE_NAME), node.cache(cacheName(depTab)),
(emp, dep) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(emp, dep) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check RIGHT JOIN with collocated data of department and employee tables.
*
* @param depTab department table name.
*/
public void checkRightJoinDepartmentEmployee(String depTab) {
Arrays.asList(true, false).forEach(forceOrd -> testAllNodes(node -> {
String qryTpl = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM " + depTab + " d RIGHT JOIN Employee e " +
"ON e.%s = d.%s";
Result actIdxOnOn = executeFrom(joinQry(forceOrd, qryTpl, "depId", "id"), node);
Result actIdxOnOff = executeFrom(joinQry(forceOrd, qryTpl, "depId", "idNoidx"), node);
Result actIdxOffOn = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "id"), node);
Result actIdxOffOff = executeFrom(joinQry(forceOrd, qryTpl, "depIdNoidx", "idNoidx"), node);
// expected in reversed order.
List<List<Object>> expected = doRightJoin(node.cache(cacheName(depTab)), node.cache(EMP_CACHE_NAME),
(dep, emp) -> sqlEq(emp.get("DEPID"), dep.get("ID")),
(dep, emp) -> Arrays.asList(emp.get("ID"), emp.get("FIRSTNAME"), dep.get("ID"), dep.get("NAME")));
assertContainsEq("Join on idx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOn.values(), expected);
assertContainsEq("Join on idx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOnOff.values(), expected);
assertContainsEq("Join on noidx = idx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOn.values(), expected);
assertContainsEq("Join on noidx = noidx is incorrect. " +
"Preserve join order = " + forceOrd + ".", actIdxOffOff.values(), expected);
}));
}
/**
* Check RIGHT JOIN with collocated data.
*/
@Test
public void testRightJoin() {
checkRightJoinEmployeeDepartment(DEP_TAB);
}
/**
* Check that FULL OUTER JOIN (which is currently unsupported) causes valid error message.
*/
@SuppressWarnings("ThrowableNotThrown")
@Test
public void testFullOuterJoinIsNotSupported() {
testAllNodes(node -> {
String fullOuterJoinQry = "SELECT e.id as EmpId, e.firstName as EmpName, d.id as DepId, d.name as DepName " +
"FROM Employee e FULL OUTER JOIN Department d " +
"ON e.depId = d.id";
GridTestUtils.assertThrows(log, () -> executeFrom(fullOuterJoinQry, node),
IgniteSQLException.class, "Failed to parse query.");
String fullOuterJoinSubquery = "SELECT EmpId from (" + fullOuterJoinQry + ")";
GridTestUtils.assertThrows(log, () -> executeFrom(fullOuterJoinSubquery, node),
IgniteSQLException.class, "Failed to parse query.");
});
}
/**
* Check that distributed FULL OUTER JOIN (which is currently unsupported) causes valid error message.
*/
@SuppressWarnings("ThrowableNotThrown")
@Test
public void testFullOuterDistributedJoinIsNotSupported() {
testAllNodes(node -> {
String qry = "SELECT d.id, d.name, a.address " +
"FROM Department d FULL OUTER JOIN Address a " +
"ON d.idNoidx = a.depIdNoidx";
GridTestUtils.assertThrows(log,
() -> executeFrom(distributedJoinQry(false, qry), node),
IgniteSQLException.class, "Failed to parse query.");
});
}
/**
* Returns true if arguments are equal in terms of sql: if both arguments are not null and content is equal.
* Note that null is not equal to null.
*/
public static boolean sqlEq(Object a, Object b) {
if (a == null || b == null)
return false;
return a.equals(b);
}
/**
* Sets explain flag. If flag is set, execute methods will perform explain query before actual query execution.
* Query plan is logged.
*
* @param shouldExplain explain flag value.
*/
public static void setExplain(boolean shouldExplain) {
explain = shouldExplain;
}
/**
* Get cache name by name of created by DDL table.
*
* @param tabName name of DDL created table.
* @return cache name.
*/
static String cacheName(String tabName) {
return "SQL_PUBLIC_" + tabName.toUpperCase();
}
/**
* Creates SqlFieldsQuery with specified enforce join order flag and specified sql from template.
*
* @param enforceJoinOrder don't let engine to reorder tables in join.
* @param tpl query template in java format.
* @param args arguments for the template.
*/
static SqlFieldsQuery joinQry(boolean enforceJoinOrder, String tpl, Object... args) {
return new SqlFieldsQuery(String.format(tpl, args)).setEnforceJoinOrder(enforceJoinOrder);
}
/**
* Creates SqlFieldsQuery with enabled distributed joins,
* specified enforce join order flag and specified sql from template.
*
* @param enforceJoinOrder don't let engine to reorder tables in join.
* @param tpl query template in java format.
* @param args arguments for the template.
*/
static SqlFieldsQuery distributedJoinQry(boolean enforceJoinOrder, String tpl, Object... args) {
return joinQry(enforceJoinOrder, tpl, args).setDistributedJoins(true);
}
}