blob: b34a496afd84a9cf023ca28a7ccca1916e3de482 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheQueryExecutedEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
import org.apache.ignite.lang.IgnitePredicate;
import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
/** Tests for query partitions derivation. */
public class IgniteSqlRoutingTest extends AbstractIndexingCommonTest {
/** */
private static final String NODE_CLIENT = "client";
/** */
private static final String CACHE_PERSON = "Person";
/** */
private static final String CACHE_CALL = "Call";
/** */
private static final int NODE_COUNT = 4;
/** Broadcast query to ensure events came from all nodes. */
private static final String FINAL_QRY = "select count(1) from {0} where name=?";
/** Param to distinguish the final query event. */
private static final String FINAL_QRY_PARAM = "Abracadabra";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration c = super.getConfiguration(gridName);
c.setMarshaller(new BinaryMarshaller());
List<CacheConfiguration> ccfgs = new ArrayList<>();
CacheConfiguration ccfg = buildCacheConfiguration(gridName);
if (ccfg != null)
ccfgs.add(ccfg);
ccfgs.add(buildCacheConfiguration(CACHE_PERSON));
ccfgs.add(buildCacheConfiguration(CACHE_CALL));
c.setCacheConfiguration(ccfgs.toArray(new CacheConfiguration[ccfgs.size()]));
c.setCacheKeyConfiguration(new CacheKeyConfiguration(CallKey.class));
c.setIncludeEventTypes(EventType.EVTS_ALL);
return c;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
startGrids(NODE_COUNT);
startClientGrid(NODE_CLIENT);
awaitPartitionMapExchange();
fillCaches();
}
/** */
private CacheConfiguration buildCacheConfiguration(String name) {
if (name.equals(CACHE_PERSON)) {
CacheConfiguration ccfg = new CacheConfiguration(CACHE_PERSON);
ccfg.setCacheMode(CacheMode.PARTITIONED);
QueryEntity entity = new QueryEntity();
entity.setKeyType(Integer.class.getName());
entity.setValueType(Person.class.getName());
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("name", String.class.getName());
fields.put("age", Integer.class.getName());
entity.setFields(fields);
ccfg.setQueryEntities(Arrays.asList(entity));
return ccfg;
}
if (name.equals(CACHE_CALL)) {
CacheConfiguration ccfg = new CacheConfiguration(CACHE_CALL);
ccfg.setCacheMode(CacheMode.PARTITIONED);
QueryEntity entity = new QueryEntity(CallKey.class.getName(), Call.class.getName());
Set<String> keyFields = new HashSet<>();
keyFields.add("personId");
keyFields.add("id");
entity.setKeyFields(keyFields);
LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("personId", Integer.class.getName());
fields.put("id", Integer.class.getName());
fields.put("name", String.class.getName());
fields.put("duration", Integer.class.getName());
entity.setFields(fields);
ccfg.setQueryEntities(Arrays.asList(entity));
return ccfg;
}
return null;
}
/** */
@Test
public void testUnicastQuerySelectAffinityKeyEqualsConstant() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select id, name, duration from Call where personId=100 order by id"), 1);
assertEquals(2, result.size());
checkResultsRow(result, 0, 1, "caller1", 100);
checkResultsRow(result, 1, 2, "caller2", 200);
}
/** */
@Test
public void testUnicastQuerySelectAffinityKeyEqualsParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select id, name, duration from Call where personId=? order by id")
.setArgs(100), 1);
assertEquals(2, result.size());
checkResultsRow(result, 0, 1, "caller1", 100);
checkResultsRow(result, 1, 2, "caller2", 200);
}
/** */
@Test
public void testUnicastQuerySelectKeyEqualsParameterReused() throws Exception {
IgniteCache<Integer, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
for (int key : new int[] {0, 250, 500, 750, 1000} ) {
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, age from Person where _key=?").setArgs(key), 1);
assertEquals(1, result.size());
Person person = cache.get(key);
checkResultsRow(result, 0, person.name, person.age);
}
}
/** */
@Test
public void testUnicastQuerySelectKeyEqualsParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
CallKey callKey = new CallKey(5, 1);
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, duration from Call where _key=?")
.setArgs(callKey), 1);
assertEquals(1, result.size());
Call call = cache.get(callKey);
checkResultsRow(result, 0, call.name, call.duration);
}
/** Check group, having, ordering allowed to be unicast requests. */
@Test
public void testUnicastQueryGroups() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
String qry = "select name, count(1) " +
"from Call " +
"where personId = ? " +
"group by name " +
"having count(1) = 1 " +
"order by name";
final int personId = 10;
List<List<?>> result = runQueryEnsureUnicast(cache, new SqlFieldsQuery(qry).setArgs(personId), 1);
assertEquals(2, result.size());
checkResultsRow(result, 0, "caller1", 1L);
checkResultsRow(result, 1, "caller2", 1L);
}
/** */
@Test
public void testUnicastQuerySelectKeyEqualAndFieldParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
CallKey callKey = new CallKey(5, 1);
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, duration from Call where _key=? and duration=?")
.setArgs(callKey, 100), 1);
assertEquals(1, result.size());
Call call = cache.get(callKey);
checkResultsRow(result, 0, call.name, call.duration);
}
/** */
@Test
public void testUnicastQuerySelect2KeyEqualsAndFieldParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
CallKey callKey1 = new CallKey(5, 1);
CallKey callKey2 = new CallKey(1000, 1);
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, duration from Call where (_key=? and duration=?) or (_key=?)")
.setArgs(callKey1, 100, callKey2), 2);
assertEquals(2, result.size());
Call call = cache.get(callKey1);
checkResultsRow(result, 0, call.name, call.duration);
call = cache.get(callKey2);
checkResultsRow(result, 1, call.name, call.duration);
}
/** */
@Test
public void testUnicastQueryKeyTypeConversionParameter() throws Exception {
IgniteCache<Integer, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
// Pass string argument to expression with integer
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, age from Person where _key = ?")
.setArgs("5"), 1);
Person person = cache.get(5);
assertEquals(1, result.size());
assertEquals(person.name, result.get(0).get(0));
assertEquals(person.age, result.get(0).get(1));
}
/** */
@Test
public void testUnicastQueryKeyTypeConversionConstant() throws Exception {
IgniteCache<Integer, Person> cache = grid(NODE_CLIENT).cache(CACHE_PERSON);
// Use string within expression against integer key
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select name, age from Person where _key = '5'"), 1);
Person person = cache.get(5);
assertEquals(1, result.size());
assertEquals(person.name, result.get(0).get(0));
assertEquals(person.age, result.get(0).get(1));
}
/** */
@Test
public void testUnicastQueryAffinityKeyTypeConversionParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
// Pass string argument to expression with integer
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select id, name, duration from Call where personId=? order by id")
.setArgs("100"), 1);
assertEquals(2, result.size());
checkResultsRow(result, 0, 1, "caller1", 100);
checkResultsRow(result, 1, 2, "caller2", 200);
}
/** */
@Test
public void testUnicastQueryAffinityKeyTypeConversionConstant() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
// Use string within expression against integer key
List<List<?>> result = runQueryEnsureUnicast(cache,
new SqlFieldsQuery("select id, name, duration from Call where personId='100' order by id"), 1);
assertEquals(2, result.size());
checkResultsRow(result, 0, 1, "caller1", 100);
checkResultsRow(result, 1, 2, "caller2", 200);
}
/** */
@Test
public void testBroadcastQuerySelectKeyEqualsOrFieldParameter() throws Exception {
IgniteCache<CallKey, Call> cache = grid(NODE_CLIENT).cache(CACHE_CALL);
CallKey callKey = new CallKey(5, 1);
List<List<?>> result = runQueryEnsureBroadcast(cache,
new SqlFieldsQuery("select name, duration from Call where _key=? or duration=?")
.setArgs(callKey, 100));
assertEquals(cache.size() / 2, result.size());
}
/** */
@Test
public void testUuidKeyAsByteArrayParameter() throws Exception {
String cacheName = "uuidCache";
CacheConfiguration<UUID, UUID> ccfg = new CacheConfiguration<>(cacheName);
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setIndexedTypes(UUID.class, UUID.class);
IgniteCache<UUID, UUID> cache = grid(NODE_CLIENT).createCache(ccfg);
try {
int count = 10;
UUID values[] = new UUID[count];
for (int i = 0; i < count; i++) {
UUID val = UUID.randomUUID();
cache.put(val, val);
values[i] = val;
}
for (UUID val : values) {
byte[] arr = convertUuidToByteArray(val);
List<List<?>> result = cache.query(new SqlFieldsQuery(
"select _val from UUID where _key = ?").setArgs(arr)).getAll();
assertEquals(1, result.size());
assertEquals(val, result.get(0).get(0));
}
}
finally {
cache.destroy();
}
}
/** */
@Test
public void testDateKeyAsTimestampParameter() throws Exception {
String cacheName = "dateCache";
CacheConfiguration<Date, Date> ccfg = new CacheConfiguration<>(cacheName);
ccfg.setCacheMode(CacheMode.PARTITIONED);
ccfg.setIndexedTypes(Date.class, Date.class);
IgniteCache<Date, Date> cache = grid(NODE_CLIENT).createCache(ccfg);
try {
int count = 30;
Date values[] = new Date[count];
DateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy");
for (int i = 0; i < count; i++) {
Date val = dateFormat.parse(String.format("%02d/06/2017", i + 1));
cache.put(val, val);
values[i] = val;
}
for (Date val : values) {
Timestamp ts = new Timestamp(val.getTime());
List<List<?>> result = cache.query(new SqlFieldsQuery(
"select _val from Date where _key = ?").setArgs(ts)).getAll();
assertEquals(1, result.size());
assertEquals(val, result.get(0).get(0));
}
}
finally {
cache.destroy();
}
}
/**
* Convert UUID to byte[].
*
* @param val UUID to convert.
* @return Result.
*/
private byte[] convertUuidToByteArray(UUID val) {
assert val != null;
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(val.getMostSignificantBits());
bb.putLong(val.getLeastSignificantBits());
return bb.array();
}
/** */
private void fillCaches() {
IgniteCache<CallKey, Call> callCache = grid(NODE_CLIENT).cache(CACHE_CALL);
IgniteCache<Integer, Person> personCache = grid(NODE_CLIENT).cache(CACHE_PERSON);
int count = affinity(personCache).partitions();
String[] names = {"John", "Bob", "James", "David", "Chuck"};
for (int i = 0; i < count; i++) {
Person person = new Person(names[i % names.length], 20 + (i % names.length));
personCache.put(i, person);
// each person gets 2 calls
callCache.put(new CallKey(i, 1), new Call("caller1", 100));
callCache.put(new CallKey(i, 2), new Call("caller2", 200));
}
}
/** */
private void checkResultsRow(List<List<?>> results, int rowId, Object ... expected) throws Exception {
assertTrue(rowId < results.size());
List<?> row = results.get(rowId);
assertEquals(expected.length, row.size());
for(int col = 0; col < expected.length; ++col)
assertEquals(expected[col], row.get(col));
}
/** Run query and check that only one node did generate 'query executed' event for it. */
private List<List<?>> runQueryEnsureUnicast(IgniteCache<?,?> cache, SqlFieldsQuery qry, int nodeCnt) throws Exception {
try (EventCounter evtCounter = new EventCounter(nodeCnt)) {
List<List<?>> result = cache.query(qry).getAll();
// do broadcast 'marker' query to ensure that we received all events from previous qry
cache.query(new SqlFieldsQuery(
MessageFormat.format(FINAL_QRY, cache.getName()))
.setArgs(FINAL_QRY_PARAM)).getAll();
// wait for all events from 'marker' query
evtCounter.await();
// return result set of first query
return result;
}
}
/** */
private List<List<?>> runQueryEnsureBroadcast(IgniteCache<?, ?> cache, SqlFieldsQuery qry) throws Exception {
final CountDownLatch execLatch = new CountDownLatch(NODE_COUNT);
final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
assert evt instanceof CacheQueryExecutedEvent;
CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
assertNotNull(qe.clause());
execLatch.countDown();
return true;
}
};
for (int i = 0; i < NODE_COUNT; i++)
grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
List<List<?>> result = cache.query(qry).getAll();
assertTrue(execLatch.await(5000, MILLISECONDS));
for (int i = 0; i < NODE_COUNT; i++)
grid(i).events().stopLocalListen(pred);
return result;
}
/** */
private class EventCounter implements AutoCloseable {
/** */
final AtomicInteger cnt;
/** */
final CountDownLatch execLatch;
/** */
final IgnitePredicate<Event> pred = new IgnitePredicate<Event>() {
@Override public boolean apply(Event evt) {
assert evt instanceof CacheQueryExecutedEvent;
CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt;
String cacheName = qe.cacheName();
assert cacheName != null;
if (!cacheName.equals(CACHE_PERSON) &&
!cacheName.equals(CACHE_CALL))
return true;
assertNotNull(qe.clause());
Object[] args = qe.arguments();
if ((args != null) && (args.length > 0) && (args[0] instanceof String)) {
String strParam = (String)args[0];
if (FINAL_QRY_PARAM.equals(strParam)) {
execLatch.countDown();
return true;
}
}
cnt.decrementAndGet();
return true;
}
};
/** */
private EventCounter(int cnt) {
this.cnt = new AtomicInteger(cnt);
this.execLatch = new CountDownLatch(NODE_COUNT);
for (int i = 0; i < NODE_COUNT; i++)
grid(i).events().localListen(pred, EVT_CACHE_QUERY_EXECUTED);
}
/** */
public void await() throws Exception {
assertTrue(execLatch.await(5000, MILLISECONDS));
assertEquals(0, cnt.get());
}
/** {@inheritDoc} */
@Override public void close() throws Exception {
for (int i = 0; i < NODE_COUNT; i++)
grid(i).events().stopLocalListen(pred);
}
}
/** */
private static class Person {
/** */
private String name;
/** */
private int age;
/** */
public Person(String name, int age) {
this.name = name;
this.age = age;
}
/** */
@Override public int hashCode() {
return name.hashCode() ^ age;
}
/** */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Person))
return false;
Person other = (Person)o;
return name.equals(other.name) && age == other.age;
}
}
/** */
private static class CallKey {
/** */
@AffinityKeyMapped
private int personId;
/** */
private int id;
/** */
private CallKey(int personId, int id) {
this.personId = personId;
this.id = id;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return personId ^ id;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof CallKey))
return false;
CallKey other = (CallKey)o;
return this.personId == other.personId && this.id == other.id;
}
}
/** */
private static class Call {
/** */
private String name;
/** */
private int duration;
/** */
public Call(String name, int duration) {
this.name = name;
this.duration = duration;
}
/** */
@Override public int hashCode() {
return name.hashCode() ^ duration;
}
/** */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof Call))
return false;
Call other = (Call)o;
return name.equals(other.name) && duration == other.duration;
}
}
}