blob: 42b2ccf3dc2a07081db4b3203769f12aa24ddae6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.client;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import static org.junit.Assert.*;
/**
* Thin client functional tests.
*/
public class FunctionalQueryTest {
/** Per test timeout */
@Rule
public Timeout globalTimeout = new Timeout((int) GridTestUtils.DFLT_TEST_TIMEOUT);
/**
* Tested API:
* <ul>
* <li>{@link ClientCache#query(Query)}</li>
* </ul>
*/
@Test
public void testQueries() throws Exception {
IgniteConfiguration srvCfg = Config.getServerConfiguration();
// No peer class loading from thin clients: we need the server to know about this class to deserialize
// ScanQuery filter.
srvCfg.setBinaryConfiguration(new BinaryConfiguration().setTypeConfigurations(Arrays.asList(
new BinaryTypeConfiguration(getClass().getName()),
new BinaryTypeConfiguration(SerializedLambda.class.getName())
)));
try (Ignite ignored = Ignition.start(srvCfg);
IgniteClient client = Ignition.startClient(getClientConfiguration())
) {
ClientCache<Integer, Person> cache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);
Map<Integer, Person> data = IntStream.rangeClosed(1, 100).boxed()
.collect(Collectors.toMap(i -> i, i -> new Person(i, String.format("Person %s", i))));
cache.putAll(data);
int minId = data.size() / 2 + 1;
int pageSize = (data.size() - minId) / 3;
int expSize = data.size() - minId + 1; // expected query result size
// Expected result
Map<Integer, Person> exp = data.entrySet().stream()
.filter(e -> e.getKey() >= minId)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Scan and SQL queries
Collection<Query<Cache.Entry<Integer, Person>>> queries = Arrays.asList(
new ScanQuery<Integer, Person>((i, p) -> p.getId() >= minId).setPageSize(pageSize),
new SqlQuery<Integer, Person>(Person.class, "id >= ?").setArgs(minId).setPageSize(pageSize)
);
for (Query<Cache.Entry<Integer, Person>> qry : queries) {
try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, Person>> res = cur.getAll();
assertEquals(
String.format("Unexpected number of rows from %s", qry.getClass().getSimpleName()),
expSize, res.size()
);
Map<Integer, Person> act = res.stream()
.collect(Collectors.toMap(Cache.Entry::getKey, Cache.Entry::getValue));
assertEquals(String.format("unexpected rows from %s", qry.getClass().getSimpleName()), exp, act);
}
}
checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, true);
checkSqlFieldsQuery(cache, minId, pageSize, expSize, exp, false);
}
}
/**
* @param cache Cache.
* @param minId Minimal ID.
* @param pageSize Page size.
* @param expSize The size of the expected results.
* @param exp Expected results.
* @param lazy Lazy mode flag.
*/
private void checkSqlFieldsQuery(ClientCache<Integer, Person> cache, int minId, int pageSize, int expSize,
Map<Integer, Person> exp, boolean lazy) {
SqlFieldsQuery qry = new SqlFieldsQuery("select id, name from Person where id >= ?")
.setArgs(minId)
.setPageSize(pageSize)
.setLazy(lazy);
try (QueryCursor<List<?>> cur = cache.query(qry)) {
List<List<?>> res = cur.getAll();
assertEquals(expSize, res.size());
Map<Integer, Person> act = res.stream().collect(Collectors.toMap(
r -> Integer.parseInt(r.get(0).toString()),
r -> new Person(Integer.parseInt(r.get(0).toString()), r.get(1).toString())
));
assertEquals(exp, act);
}
}
/**
* Tested API:
* <ul>
* <li>{@link IgniteClient#query(SqlFieldsQuery)}</li>
* </ul>
*/
@Test
public void testSql() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration()); Ignite ignored2 = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
) {
client.query(
new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS Person (id INT PRIMARY KEY, name VARCHAR) WITH \"VALUE_TYPE=%s\"",
Person.class.getName()
)).setSchema("PUBLIC")
).getAll();
final int KEY_COUNT = 10;
for (int i = 0; i < KEY_COUNT; ++i) {
int key = i;
Person val = new Person(key, "Person " + i);
client.query(new SqlFieldsQuery(
"INSERT INTO Person(id, name) VALUES(?, ?)"
).setArgs(val.getId(), val.getName()).setSchema("PUBLIC"))
.getAll();
}
Object cachedName = client.query(
new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(1).setSchema("PUBLIC")
).getAll().iterator().next().iterator().next();
assertEquals("Person 1", cachedName);
List<List<?>> rows = client.query(
new SqlFieldsQuery("SELECT * from Person WHERE id >= ?")
.setSchema("PUBLIC")
.setArgs(0)
.setPageSize(1)
).getAll();
assertEquals(KEY_COUNT, rows.size());
}
}
/** */
@Test
public void testGettingEmptyResultWhenQueryingEmptyTable() throws Exception {
try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(Config.SERVER))
) {
final String TBL = "Person";
client.query(
new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS " + TBL + " (id INT PRIMARY KEY, name VARCHAR) WITH \"VALUE_TYPE=%s\"",
Person.class.getName()
)).setSchema("PUBLIC")
).getAll();
// IgniteClient#query() API
List<List<?>> res = client.query(new SqlFieldsQuery("SELECT * FROM " + TBL)).getAll();
assertNotNull(res);
assertEquals(0, res.size());
// ClientCache#query(SqlFieldsQuery) API
ClientCache<Integer, Person> cache = client.cache("SQL_PUBLIC_" + TBL.toUpperCase());
res = cache.query(new SqlFieldsQuery("SELECT * FROM " + TBL)).getAll();
assertNotNull(res);
assertEquals(0, res.size());
// ClientCache#query(ScanQuery) and ClientCache#query(SqlQuery) API
Collection<Query<Cache.Entry<Integer, Person>>> queries = Arrays.asList(
new ScanQuery<>(),
new SqlQuery<>(Person.class, "1 = 1")
);
for (Query<Cache.Entry<Integer, Person>> qry : queries) {
try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) {
List<Cache.Entry<Integer, Person>> res2 = cur.getAll();
assertNotNull(res2);
assertEquals(0, res2.size());
}
}
}
}
/** */
private static ClientConfiguration getClientConfiguration() {
return new ClientConfiguration().setAddresses(Config.SERVER)
.setSendBufferSize(0)
.setReceiveBufferSize(0);
}
}