| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.query; |
| |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.Ignition; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cache.query.FieldsQueryCursor; |
| import org.apache.ignite.cache.query.QueryRetryException; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cache.query.annotations.QuerySqlField; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.junit.Test; |
| |
| /** |
| * Tests for query execution check cases for correct table lock/unlock. |
| */ |
| public abstract class AbstractQueryTableLockAndConnectionPoolSelfTest extends AbstractIndexingCommonTest { |
| /** Keys count. */ |
| private static final int KEY_CNT = 500; |
| |
| /** Base query argument. */ |
| private static final int BASE_QRY_ARG = 50; |
| |
| /** Size for small pages. */ |
| private static final int PAGE_SIZE_SMALL = 12; |
| |
| /** Test duration. */ |
| private static final long TEST_DUR = GridTestUtils.SF.applyLB(10_000, 3_000); |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| } |
| |
| /** |
| * Test local query execution. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSingleNode() throws Exception { |
| checkSingleNode(1); |
| } |
| |
| /** |
| * Test local query execution. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSingleNodeWithParallelism() throws Exception { |
| checkSingleNode(4); |
| } |
| |
| /** |
| * Test query execution with multiple topology nodes. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultipleNodes() throws Exception { |
| checkMultipleNodes(1); |
| } |
| |
| /** |
| * Test query execution with multiple topology nodes with query parallelism. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultipleNodesWithParallelism() throws Exception { |
| checkMultipleNodes(4); |
| } |
| |
| /** |
| * Test DDL operation on table with high load queries. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSingleNodeTablesLockQueryAndDDLMultithreaded() throws Exception { |
| final Ignite srv = startGrid(0); |
| |
| populateBaseQueryData(srv, 1); |
| |
| checkTablesLockQueryAndDDLMultithreaded(srv); |
| |
| checkTablesLockQueryAndDropColumnMultithreaded(srv); |
| } |
| |
| /** |
| * Test DDL operation on table with high load queries. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testSingleNodeWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception { |
| final Ignite srv = startGrid(0); |
| |
| populateBaseQueryData(srv, 4); |
| |
| checkTablesLockQueryAndDDLMultithreaded(srv); |
| |
| checkTablesLockQueryAndDropColumnMultithreaded(srv); |
| } |
| |
| /** |
| * Test DDL operation on table with high load queries. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultipleNodesWithTablesLockQueryAndDDLMultithreaded() throws Exception { |
| Ignite srv0 = startGrid(0); |
| Ignite srv1 = startGrid(1); |
| startGrid(2); |
| |
| Ignite cli; |
| |
| try { |
| Ignition.setClientMode(true); |
| |
| cli = startGrid(3); |
| } |
| finally { |
| Ignition.setClientMode(false); |
| } |
| |
| populateBaseQueryData(srv0, 1); |
| |
| checkTablesLockQueryAndDDLMultithreaded(srv0); |
| checkTablesLockQueryAndDDLMultithreaded(srv1); |
| checkTablesLockQueryAndDDLMultithreaded(cli); |
| |
| checkTablesLockQueryAndDropColumnMultithreaded(srv0); |
| checkTablesLockQueryAndDropColumnMultithreaded(srv1); |
| // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI |
| // checkTablesLockQueryAndDropColumnMultithreaded(cli); |
| } |
| |
| /** |
| * Test DDL operation on table with high load queries. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testMultipleNodesWithParallelismTablesLockQueryAndDDLMultithreaded() throws Exception { |
| Ignite srv0 = startGrid(0); |
| Ignite srv1 = startGrid(1); |
| startGrid(2); |
| |
| Ignite cli; |
| |
| try { |
| Ignition.setClientMode(true); |
| |
| cli = startGrid(3); |
| } |
| finally { |
| Ignition.setClientMode(false); |
| } |
| |
| populateBaseQueryData(srv0, 4); |
| |
| checkTablesLockQueryAndDDLMultithreaded(srv0); |
| checkTablesLockQueryAndDDLMultithreaded(srv1); |
| checkTablesLockQueryAndDDLMultithreaded(cli); |
| |
| checkTablesLockQueryAndDropColumnMultithreaded(srv0); |
| checkTablesLockQueryAndDropColumnMultithreaded(srv1); |
| // TODO: +++ DDL DROP COLUMN CacheContext == null on CLI |
| // checkTablesLockQueryAndDropColumnMultithreaded(cli); |
| } |
| |
| /** |
| * Test release reserved partition after query complete (results is bigger than one page). |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testReleasePartitionReservationSeveralPagesResults() throws Exception { |
| checkReleasePartitionReservation(PAGE_SIZE_SMALL); |
| } |
| |
| /** |
| * Test release reserved partition after query complete (results is placed on one page). |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testReleasePartitionReservationOnePageResults() throws Exception { |
| checkReleasePartitionReservation(KEY_CNT); |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testFetchFromRemovedTable() throws Exception { |
| Ignite srv = startGrid(0); |
| |
| execute(srv, "CREATE TABLE TEST (id int primary key, val int)"); |
| |
| for (int i = 0; i < 10; ++i) |
| execute(srv, "INSERT INTO TEST VALUES (" + i + ", " + i + ")"); |
| |
| FieldsQueryCursor<List<?>> cur = execute(srv, new SqlFieldsQuery("SELECT * from TEST").setPageSize(1)); |
| |
| Iterator<List<?>> it = cur.iterator(); |
| |
| it.next(); |
| |
| execute(srv, "DROP TABLE TEST"); |
| |
| try { |
| while (it.hasNext()) |
| it.next(); |
| |
| if (lazy()) |
| fail("Retry exception must be thrown"); |
| } |
| catch (Exception e) { |
| if (!lazy()) { |
| log.error("In lazy=false mode the query must be finished successfully", e); |
| |
| fail("In lazy=false mode the query must be finished successfully"); |
| } |
| else |
| assertNotNull(X.cause(e, QueryRetryException.class)); |
| } |
| } |
| |
| /** |
| * @param node Ignite node to execute query. |
| * @throws Exception If failed. |
| */ |
| private void checkTablesLockQueryAndDDLMultithreaded(final Ignite node) throws Exception { |
| final AtomicBoolean end = new AtomicBoolean(false); |
| |
| final int qryThreads = 10; |
| |
| // Do many concurrent queries. |
| IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { |
| @Override public void run() { |
| while(!end.get()) { |
| try { |
| FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery( |
| "SELECT pers.id, pers.name " + |
| "FROM (SELECT DISTINCT p.id, p.name " + |
| "FROM \"pers\".PERSON as p) as pers " + |
| "JOIN \"pers\".PERSON p on p.id = pers.id " + |
| "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " + |
| "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id") |
| .setLazy(lazy()) |
| .setPageSize(PAGE_SIZE_SMALL)); |
| |
| cursor.getAll(); |
| } |
| catch (Exception e) { |
| if(X.cause(e, QueryRetryException.class) == null) { |
| log.error("Unexpected exception", e); |
| |
| fail("Unexpected exception. " + e); |
| } |
| else if (!lazy()) { |
| log.error("Unexpected exception", e); |
| |
| fail("Unexpected QueryRetryException."); |
| } |
| } |
| } |
| } |
| }, qryThreads, "usr-qry"); |
| |
| long tEnd = U.currentTimeMillis() + TEST_DUR; |
| |
| while (U.currentTimeMillis() < tEnd) { |
| execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll(); |
| execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll(); |
| } |
| |
| // Test is OK in case DDL operations is passed on hi load queries pressure. |
| end.set(true); |
| fut.get(); |
| } |
| |
| /** |
| * @param node Ignite node to execute query. |
| * @throws Exception If failed. |
| */ |
| private void checkTablesLockQueryAndDropColumnMultithreaded(final Ignite node) throws Exception { |
| final AtomicBoolean end = new AtomicBoolean(false); |
| |
| final int qryThreads = 10; |
| |
| // Do many concurrent queries. |
| IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { |
| @Override public void run() { |
| while(!end.get()) { |
| try { |
| FieldsQueryCursor<List<?>> cursor = execute(node, new SqlFieldsQuery( |
| "SELECT pers.id, pers.name FROM \"pers\".PERSON") |
| .setLazy(lazy()) |
| .setPageSize(PAGE_SIZE_SMALL)); |
| |
| cursor.getAll(); |
| } |
| catch (Exception e) { |
| if (e.getMessage().contains("Failed to parse query. Column \"PERS.ID\" not found")) { |
| // Swallow exception when column is dropped. |
| } |
| else if(X.cause(e, QueryRetryException.class) == null) { |
| log.error("Unexpected exception", e); |
| |
| fail("Unexpected exception. " + e); |
| } |
| else if (!lazy()) { |
| log.error("Unexpected exception", e); |
| |
| fail("Unexpected QueryRetryException."); |
| } |
| } |
| } |
| } |
| }, qryThreads, "usr-qry"); |
| |
| long tEnd = U.currentTimeMillis() + TEST_DUR; |
| |
| while (U.currentTimeMillis() < tEnd) { |
| execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person DROP COLUMN name")).getAll(); |
| execute(node, new SqlFieldsQuery("ALTER TABLE \"pers\".Person ADD COLUMN name varchar")).getAll(); |
| } |
| |
| // Test is OK in case DDL operations is passed on hi load queries pressure. |
| end.set(true); |
| fut.get(); |
| } |
| |
| /** |
| * Test release reserved partition after query complete. |
| * In case partitions not released the `awaitPartitionMapExchange` fails by timeout. |
| * |
| * @param pageSize Results page size. |
| * @throws Exception If failed. |
| */ |
| public void checkReleasePartitionReservation(int pageSize) throws Exception { |
| Ignite srv1 = startGrid(1); |
| startGrid(2); |
| |
| populateBaseQueryData(srv1, 1); |
| |
| FieldsQueryCursor<List<?>> cursor = execute(srv1, query(0).setPageSize(pageSize)); |
| |
| cursor.getAll(); |
| |
| startGrid(3); |
| |
| awaitPartitionMapExchange(); |
| } |
| |
| /** |
| * Check local query execution. |
| * |
| * @param parallelism Query parallelism. |
| * @throws Exception If failed. |
| */ |
| public void checkSingleNode(int parallelism) throws Exception { |
| Ignite srv = startGrid(); |
| |
| populateBaseQueryData(srv, parallelism); |
| |
| checkBaseOperations(srv); |
| } |
| |
| /** |
| * Check query execution with multiple topology nodes. |
| * |
| * @param parallelism Query parallelism. |
| * @throws Exception If failed. |
| */ |
| public void checkMultipleNodes(int parallelism) throws Exception { |
| Ignite srv1 = startGrid(1); |
| Ignite srv2 = startGrid(2); |
| |
| Ignite cli; |
| |
| try { |
| Ignition.setClientMode(true); |
| |
| cli = startGrid(3); |
| } |
| finally { |
| Ignition.setClientMode(false); |
| } |
| |
| populateBaseQueryData(cli, parallelism); |
| |
| checkBaseOperations(srv1); |
| checkBaseOperations(srv2); |
| checkBaseOperations(cli); |
| |
| // Test originating node leave. |
| FieldsQueryCursor<List<?>> cursor = execute(cli, baseQuery().setPageSize(PAGE_SIZE_SMALL)); |
| |
| Iterator<List<?>> iter = cursor.iterator(); |
| |
| for (int i = 0; i < 30; i++) |
| iter.next(); |
| |
| stopGrid(3); |
| |
| // Test server node leave with active worker. |
| FieldsQueryCursor<List<?>> cursor2 = execute(srv1, baseQuery().setPageSize(PAGE_SIZE_SMALL)); |
| |
| try { |
| Iterator<List<?>> iter2 = cursor2.iterator(); |
| |
| for (int i = 0; i < 30; i++) |
| iter2.next(); |
| |
| stopGrid(2); |
| } |
| finally { |
| cursor2.close(); |
| } |
| } |
| |
| /** |
| * Check base operations. |
| * |
| * @param node Node. |
| * @throws Exception If failed. |
| */ |
| private void checkBaseOperations(Ignite node) throws Exception { |
| checkQuerySplitToSeveralMapQueries(node); |
| |
| // Get full data. |
| { |
| List<List<?>> rows = execute(node, baseQuery()).getAll(); |
| |
| assertBaseQueryResults(rows); |
| } |
| |
| // Check QueryRetryException is thrown |
| { |
| List<List<?>> rows = new ArrayList<>(); |
| |
| FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)); |
| |
| Iterator<List<?>> it = cursor.iterator(); |
| |
| for (int i = 0; i < 10; ++i) |
| rows.add(it.next()); |
| |
| execute(node, new SqlFieldsQuery("CREATE INDEX \"pers\".PERSON_NAME ON \"pers\".Person (name asc)")).getAll(); |
| execute(node, new SqlFieldsQuery("DROP INDEX \"pers\".PERSON_NAME")).getAll(); |
| |
| try { |
| while (it.hasNext()) |
| rows.add(it.next()); |
| |
| if (lazy()) |
| fail("Retry exception must be thrown"); |
| } |
| catch (Exception e) { |
| if (!lazy() || X.cause(e, QueryRetryException.class) == null) { |
| log.error("Invalid exception: ", e); |
| |
| fail("QueryRetryException is expected"); |
| } |
| } |
| } |
| |
| // Get data in several pages. |
| { |
| List<List<?>> rows = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)).getAll(); |
| |
| assertBaseQueryResults(rows); |
| } |
| |
| // Test full iteration. |
| { |
| List<List<?>> rows = new ArrayList<>(); |
| |
| FieldsQueryCursor<List<?>> cursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL)); |
| |
| for (List<?> row : cursor) |
| rows.add(row); |
| |
| cursor.close(); |
| |
| assertBaseQueryResults(rows); |
| } |
| |
| // Test partial iteration with cursor close. |
| try (FieldsQueryCursor<List<?>> partialCursor = execute(node, baseQuery().setPageSize(PAGE_SIZE_SMALL))) { |
| Iterator<List<?>> iter = partialCursor.iterator(); |
| |
| for (int i = 0; i < 30; i++) |
| iter.next(); |
| } |
| |
| // Test execution of multiple queries at a time. |
| List<Iterator<List<?>>> iters = new ArrayList<>(); |
| |
| for (int i = 0; i < 200; i++) |
| iters.add(execute(node, randomizedQuery().setPageSize(PAGE_SIZE_SMALL)).iterator()); |
| |
| while (!iters.isEmpty()) { |
| Iterator<Iterator<List<?>>> iterIter = iters.iterator(); |
| |
| while (iterIter.hasNext()) { |
| Iterator<List<?>> iter = iterIter.next(); |
| |
| int i = 0; |
| |
| while (iter.hasNext() && i < 20) { |
| iter.next(); |
| |
| i++; |
| } |
| |
| if (!iter.hasNext()) |
| iterIter.remove(); |
| } |
| } |
| |
| checkHoldQuery(node); |
| |
| checkShortQuery(node); |
| } |
| |
| /** |
| * @param node Ignite node. |
| * @throws Exception If failed. |
| */ |
| public void checkHoldQuery(Ignite node) throws Exception { |
| ArrayList rows = new ArrayList<>(); |
| |
| Iterator<List<?>> it0 = execute(node, query(BASE_QRY_ARG).setPageSize(PAGE_SIZE_SMALL)).iterator(); |
| rows.add(it0.next()); |
| |
| // Do many concurrent queries to Test full iteration. |
| GridTestUtils.runMultiThreaded(new Runnable() { |
| @Override public void run() { |
| for (int i = 0; i < 5; ++i) { |
| FieldsQueryCursor<List<?>> cursor = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1) |
| .setPageSize(PAGE_SIZE_SMALL)); |
| |
| cursor.getAll(); |
| } |
| } |
| }, 5, "test-qry"); |
| |
| // Do the same query in the same thread. |
| { |
| FieldsQueryCursor<List<?>> cursor = execute(node, query(BASE_QRY_ARG) |
| .setPageSize(PAGE_SIZE_SMALL)); |
| |
| cursor.getAll(); |
| } |
| |
| while (it0.hasNext()) |
| rows.add(it0.next()); |
| |
| assertBaseQueryResults(rows); |
| } |
| |
| /** |
| * @param node Ignite node. |
| * @throws Exception If failed. |
| */ |
| public void checkShortQuery(Ignite node) throws Exception { |
| ArrayList rows = new ArrayList<>(); |
| |
| FieldsQueryCursor<List<?>> cursor0 = execute(node, query(KEY_CNT - PAGE_SIZE_SMALL + 1).setPageSize(PAGE_SIZE_SMALL)); |
| |
| Iterator<List<?>> it = cursor0.iterator(); |
| |
| while (it.hasNext()) |
| rows.add(it.next()); |
| |
| assertQueryResults(rows, KEY_CNT - PAGE_SIZE_SMALL + 1); |
| } |
| |
| /** |
| * @param node Ignite node. |
| * @throws Exception If failed. |
| */ |
| public void checkQuerySplitToSeveralMapQueries(Ignite node) throws Exception { |
| ArrayList rows = new ArrayList<>(); |
| |
| FieldsQueryCursor<List<?>> cursor0 = execute(node, new SqlFieldsQuery( |
| "SELECT pers.id, pers.name " + |
| "FROM (SELECT DISTINCT p.id, p.name " + |
| "FROM \"pers\".PERSON as p) as pers " + |
| "JOIN \"pers\".PERSON p on p.id = pers.id " + |
| "JOIN (SELECT t.persId as persId, SUM(t.time) totalTime " + |
| "FROM \"persTask\".PersonTask as t GROUP BY t.persId) as task ON task.persId = pers.id") |
| .setPageSize(PAGE_SIZE_SMALL)); |
| |
| Iterator<List<?>> it = cursor0.iterator(); |
| |
| while (it.hasNext()) |
| rows.add(it.next()); |
| |
| assertQueryResults(rows, 0); |
| } |
| |
| /** |
| * Populate base query data. |
| * |
| * @param node Node. |
| * @param parallelism Query parallelism. |
| */ |
| private static void populateBaseQueryData(Ignite node, int parallelism) { |
| node.createCache(cacheConfiguration(parallelism, "pers", Person.class)); |
| node.createCache(cacheConfiguration(parallelism, "persTask", PersonTask.class)); |
| |
| IgniteCache<Long, Object> pers = cache(node, "pers"); |
| |
| for (long i = 0; i < KEY_CNT; i++) |
| pers.put(i, new Person(i)); |
| |
| IgniteCache<Long, Object> comp = cache(node, "persTask"); |
| |
| for (long i = 0; i < KEY_CNT; i++) |
| comp.put(i, new PersonTask(i)); |
| } |
| |
| /** |
| * @return Query with randomized argument. |
| */ |
| private static SqlFieldsQuery randomizedQuery() { |
| return query(ThreadLocalRandom.current().nextInt(KEY_CNT / 2)); |
| } |
| |
| /** |
| * @return Base query. |
| */ |
| private static SqlFieldsQuery baseQuery() { |
| return query(BASE_QRY_ARG); |
| } |
| |
| /** |
| * @param parallelism Query parallelism. |
| * @param name Cache name. |
| * @param valClass Value class. |
| * @return Default cache configuration. |
| */ |
| private static CacheConfiguration<Long, Person> cacheConfiguration(int parallelism, String name, Class valClass) { |
| return new CacheConfiguration<Long, Person>() |
| .setName(name) |
| .setIndexedTypes(Long.class, valClass) |
| .setQueryParallelism(parallelism) |
| .setAffinity(new RendezvousAffinityFunction(false, 10)); |
| } |
| |
| /** |
| * Default query. |
| * |
| * @param arg Argument. |
| * @return Query. |
| */ |
| private static SqlFieldsQuery query(long arg) { |
| return new SqlFieldsQuery( |
| "SELECT id, name FROM \"pers\".Person WHERE id >= " + arg); |
| } |
| |
| /** |
| * Assert base query results. |
| * |
| * @param rows Result rows. |
| */ |
| private static void assertBaseQueryResults(List<List<?>> rows) { |
| assertQueryResults(rows, BASE_QRY_ARG); |
| } |
| |
| /** |
| * Assert base query results. |
| * |
| * @param rows Result rows. |
| * @param resSize Result size. |
| */ |
| private static void assertQueryResults(List<List<?>> rows, int resSize) { |
| assertEquals(KEY_CNT - resSize, rows.size()); |
| |
| for (List<?> row : rows) { |
| Long id = (Long)row.get(0); |
| String name = (String)row.get(1); |
| |
| assertTrue(id >= resSize); |
| assertEquals(nameForId(id), name); |
| } |
| } |
| |
| /** |
| * Get cache for node. |
| * |
| * @param node Ignite node to get cache. |
| * @param name Cache name. |
| * @return Cache. |
| */ |
| private static IgniteCache<Long, Object> cache(Ignite node, String name) { |
| return node.cache(name); |
| } |
| |
| /** |
| * Execute query on the given cache. |
| * |
| * @param node Node. |
| * @param sql Query. |
| * @return Cursor. |
| */ |
| private FieldsQueryCursor<List<?>> execute(Ignite node, String sql) { |
| return ((IgniteEx)node).context().query().querySqlFields(new SqlFieldsQuery(sql).setLazy(lazy()), false); |
| } |
| |
| /** |
| * Execute query on the given cache. |
| * |
| * @param node Node. |
| * @param qry Query. |
| * @return Cursor. |
| */ |
| private FieldsQueryCursor<List<?>> execute(Ignite node, SqlFieldsQuery qry) { |
| return ((IgniteEx)node).context().query().querySqlFields(qry.setLazy(lazy()), false); |
| } |
| |
| /** |
| * @return Lazy mode. |
| */ |
| protected abstract boolean lazy(); |
| |
| /** |
| * Get name for ID. |
| * |
| * @param id ID. |
| * @return Name. |
| */ |
| private static String nameForId(long id) { |
| return "name-" + id; |
| } |
| |
| /** |
| * Person class. |
| */ |
| private static class Person { |
| /** ID. */ |
| @QuerySqlField(index = true) |
| private long id; |
| |
| /** Name. */ |
| @QuerySqlField |
| private String name; |
| |
| /** |
| * Constructor. |
| * |
| * @param id ID. |
| */ |
| public Person(long id) { |
| this.id = id; |
| this.name = nameForId(id); |
| } |
| |
| /** |
| * @return ID. |
| */ |
| public long id() { |
| return id; |
| } |
| |
| /** |
| * @return Name. |
| */ |
| public String name() { |
| return name; |
| } |
| } |
| |
| /** |
| * Company class. |
| */ |
| private static class PersonTask { |
| /** ID. */ |
| @QuerySqlField(index = true) |
| private long id; |
| |
| @QuerySqlField(index = true) |
| private long persId; |
| |
| /** Name. */ |
| @QuerySqlField |
| private long time; |
| |
| /** |
| * Constructor. |
| * |
| * @param id ID. |
| */ |
| public PersonTask(long id) { |
| this.id = id; |
| this.persId = id; |
| this.time = id; |
| } |
| |
| /** |
| * @return ID. |
| */ |
| public long id() { |
| return id; |
| } |
| |
| /** |
| * @return Name. |
| */ |
| public long time() { |
| return time; |
| } |
| } |
| } |