IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes #10756.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 2d543a9..7afc674 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,6 +48,7 @@
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -98,10 +99,12 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
/** */
public class CalciteQueryProcessor extends GridProcessorAdapter implements QueryEngine {
@@ -521,6 +524,16 @@
qryReg.register(qry);
+ if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
+ ctx.event().record(new SqlQueryExecutionEvent(
+ ctx.discovery().localNode(),
+ "SQL query execution.",
+ sql,
+ params,
+ SecurityUtils.securitySubjectId(ctx))
+ );
+ }
+
try {
return action.apply(qry);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 414db87..6adb151 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -35,6 +35,8 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryReadEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -44,6 +46,7 @@
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
@@ -95,12 +98,14 @@
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.singletonList;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
@@ -116,6 +121,9 @@
private UUID locNodeId;
/** */
+ private GridKernalContext ctx;
+
+ /** */
private GridEventStorageManager evtMgr;
/** */
@@ -400,6 +408,7 @@
/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
+ this.ctx = ctx;
localNodeId(ctx.localNodeId());
exchangeManager(ctx.cache().context().exchange());
cacheObjectValueContext(ctx.query().objectContext());
@@ -647,8 +656,38 @@
Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null);
+ Function<List<Object>, List<Object>> rowConverter = null;
+
+ // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor.
+ if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+ ClusterNode locNode = ctx.discovery().localNode();
+ UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+ rowConverter = row -> {
+ evtMgr.record(new CacheQueryReadEvent<>(
+ locNode,
+ "SQL fields query result set row read.",
+ EVT_CACHE_QUERY_OBJECT_READ,
+ CacheQueryType.SQL_FIELDS.name(),
+ qryProps.cacheName(),
+ null,
+ qry.sql(),
+ null,
+ null,
+ qry.parameters(),
+ subjId,
+ null,
+ null,
+ null,
+ null,
+ row));
+
+ return row;
+ };
+ }
+
Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
- fieldConverter);
+ fieldConverter, rowConverter);
return new ListFieldsQueryCursor<>(plan, it, ectx);
}
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index e3d19e18..998f83b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -40,14 +40,19 @@
@Nullable private final Function<Object, Object> fieldConverter;
/** */
+ @Nullable Function<List<Object>, List<Object>> rowConverter;
+
+ /** */
public ConvertingClosableIterator(
Iterator<Row> it,
ExecutionContext<Row> ectx,
- @Nullable Function<Object, Object> fieldConverter
+ @Nullable Function<Object, Object> fieldConverter,
+ @Nullable Function<List<Object>, List<Object>> rowConverter
) {
this.it = it;
rowHnd = ectx.rowHandler();
this.fieldConverter = fieldConverter;
+ this.rowConverter = rowConverter;
}
/**
@@ -70,7 +75,7 @@
for (int i = 0; i < rowSize; i++)
res.add(fieldConverter == null ? rowHnd.get(i, next) : fieldConverter.apply(rowHnd.get(i, next)));
- return res;
+ return rowConverter == null ? res : rowConverter.apply(res);
}
/**
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 064fba3..e67fb63 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -24,13 +24,24 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.Test;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
@@ -40,6 +51,34 @@
* Test SQL diagnostic tools.
*/
public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setSqlConfiguration(new SqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration()))
+ .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION, EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrids(nodeCount());
+
+ client = startClientGrid();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
/** */
@Override protected int nodeCount() {
return 2;
@@ -125,4 +164,64 @@
assertTrue("Query reads expected on nodes: " + readsNodes, readsNodes.isEmpty());
assertEquals(Collections.singleton(lastQryId.get()), readsQueries);
}
+
+ /** */
+ @Test
+ public void testSqlEvents() {
+ sql("CREATE TABLE test_event (a INT) WITH cache_name=\"test_event\"");
+
+ AtomicIntegerArray evtsSqlExec = new AtomicIntegerArray(nodeCount());
+ AtomicIntegerArray evtsQryExec = new AtomicIntegerArray(nodeCount());
+ AtomicIntegerArray evtsQryRead = new AtomicIntegerArray(nodeCount());
+ for (int i = 0; i < nodeCount(); i++) {
+ int n = i;
+ grid(i).events().localListen(e -> {
+ evtsSqlExec.incrementAndGet(n);
+
+ assertTrue(e instanceof SqlQueryExecutionEvent);
+ assertTrue(((SqlQueryExecutionEvent)e).text().toLowerCase().contains("test_event"));
+
+ return true;
+ }, EVT_SQL_QUERY_EXECUTION);
+
+ grid(i).events().localListen(e -> {
+ evtsQryExec.incrementAndGet(n);
+
+ assertTrue(e instanceof CacheQueryExecutedEvent);
+ assertEquals("test_event", ((CacheQueryExecutedEvent<?, ?>)e).cacheName());
+ assertTrue(((CacheQueryExecutedEvent<?, ?>)e).clause().toLowerCase().contains("test_event"));
+ assertEquals(SQL_FIELDS.name(), ((CacheQueryExecutedEvent<?, ?>)e).queryType());
+ assertEquals(3, ((CacheQueryExecutedEvent<?, ?>)e).arguments().length);
+ assertNull(((CacheQueryExecutedEvent<?, ?>)e).scanQueryFilter());
+ assertNull(((CacheQueryExecutedEvent<?, ?>)e).continuousQueryFilter());
+
+ return true;
+ }, EVT_CACHE_QUERY_EXECUTED);
+
+ grid(i).events().localListen(e -> {
+ evtsQryRead.incrementAndGet(n);
+
+ assertTrue(e instanceof CacheQueryReadEvent);
+ assertEquals(SQL_FIELDS.name(), ((CacheQueryReadEvent<?, ?>)e).queryType());
+ assertTrue(((CacheQueryReadEvent<?, ?>)e).clause().toLowerCase().contains("test_event"));
+ assertNotNull(((CacheQueryReadEvent<?, ?>)e).row());
+
+ return true;
+ }, EVT_CACHE_QUERY_OBJECT_READ);
+ }
+
+ grid(0).cache("test_event").query(new SqlFieldsQuery("INSERT INTO test_event VALUES (?), (?), (?)")
+ .setArgs(0, 1, 2)).getAll();
+
+ grid(0).cache("test_event").query(new SqlFieldsQuery("SELECT * FROM test_event WHERE a IN (?, ?, ?)")
+ .setArgs(0, 1, 3)).getAll();
+
+ assertEquals(2, evtsSqlExec.get(0));
+ assertEquals(0, evtsSqlExec.get(1));
+ assertEquals(2, evtsQryExec.get(0));
+ assertEquals(0, evtsQryExec.get(1));
+ // 1 event fired by insert (number of rows inserted) + 2 events (1 per row selected) fired by the second query.
+ assertEquals(3, evtsQryRead.get(0));
+ assertEquals(0, evtsQryRead.get(1));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index cdacb5c..69716c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -545,6 +545,7 @@
* @see CacheEvent
*/
public static final int EVT_CACHE_OBJECT_EXPIRED = 70;
+
/**
* Built-in event type: cache rebalance started.
* <p>
@@ -955,7 +956,7 @@
* This event is triggered after a corresponding SQL query validated and before it is executed.
* Unlike {@link #EVT_CACHE_QUERY_EXECUTED}, {@code EVT_SQL_QUERY_EXECUTION} is fired only once for a request
* and does not relate to a specific cache.
- * Enet includes the following information: qurey text and its arguments, security subject id.
+ * Event includes the following information: query text and its arguments, security subject id.
*
* <p>
* NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7556fdc..d6a7b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3088,7 +3088,7 @@
QueryEngine qryEngine = engineForQuery(cliCtx, qry);
if (qryEngine != null) {
- QueryProperties qryProps = new QueryProperties(keepBinary);
+ QueryProperties qryProps = new QueryProperties(cctx == null ? null : cctx.name(), keepBinary);
if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) {
res = qryEngine.queryBatched(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
index fa41ffb..9e2bad2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
@@ -17,15 +17,21 @@
package org.apache.ignite.internal.processors.query;
+import org.jetbrains.annotations.Nullable;
+
/**
* Additional properties to execute the query (Stored in {@link QueryContext}).
*/
public final class QueryProperties {
/** */
+ @Nullable String cacheName;
+
+ /** */
private final boolean keepBinary;
/** */
- public QueryProperties(boolean keepBinary) {
+ public QueryProperties(@Nullable String cacheName, boolean keepBinary) {
+ this.cacheName = cacheName;
this.keepBinary = keepBinary;
}
@@ -33,4 +39,9 @@
public boolean keepBinary() {
return keepBinary;
}
+
+ /** */
+ public @Nullable String cacheName() {
+ return cacheName;
+ }
}