IGNITE-19586 SQL Calcite: Fix SQL/Query events - Fixes #10756.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index 2d543a9..7afc674 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -48,6 +48,7 @@
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
 import org.apache.ignite.configuration.QueryEngineConfiguration;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -98,10 +99,12 @@
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.LifecycleAware;
 import org.apache.ignite.internal.processors.query.calcite.util.Service;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.IgniteSystemProperties.getLong;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
 
 /** */
 public class CalciteQueryProcessor extends GridProcessorAdapter implements QueryEngine {
@@ -521,6 +524,16 @@
 
         qryReg.register(qry);
 
+        if (ctx.event().isRecordable(EVT_SQL_QUERY_EXECUTION)) {
+            ctx.event().record(new SqlQueryExecutionEvent(
+                ctx.discovery().localNode(),
+                "SQL query execution.",
+                sql,
+                params,
+                SecurityUtils.securitySubjectId(ctx))
+            );
+        }
+
         try {
             return action.apply(qry);
         }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 414db87..6adb151 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -35,6 +35,8 @@
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
 import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheQueryReadEvent;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
@@ -44,6 +46,7 @@
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
@@ -95,12 +98,14 @@
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.processors.query.calcite.util.ConvertingClosableIterator;
 import org.apache.ignite.internal.processors.query.calcite.util.ListFieldsQueryCursor;
+import org.apache.ignite.internal.processors.security.SecurityUtils;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.singletonList;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 import static org.apache.ignite.internal.processors.query.calcite.CalciteQueryProcessor.FRAMEWORK_CONFIG;
 import static org.apache.ignite.internal.processors.query.calcite.externalize.RelJsonReader.fromJson;
 
@@ -116,6 +121,9 @@
     private UUID locNodeId;
 
     /** */
+    private GridKernalContext ctx;
+
+    /** */
     private GridEventStorageManager evtMgr;
 
     /** */
@@ -400,6 +408,7 @@
 
     /** {@inheritDoc} */
     @Override public void onStart(GridKernalContext ctx) {
+        this.ctx = ctx;
         localNodeId(ctx.localNodeId());
         exchangeManager(ctx.cache().context().exchange());
         cacheObjectValueContext(ctx.query().objectContext());
@@ -647,8 +656,38 @@
         Function<Object, Object> fieldConverter = (qryProps == null || qryProps.keepBinary()) ? null :
             o -> CacheObjectUtils.unwrapBinaryIfNeeded(objValCtx, o, false, true, null);
 
+        Function<List<Object>, List<Object>> rowConverter = null;
+
+        // Fire EVT_CACHE_QUERY_OBJECT_READ on initiator node before return result to cursor.
+        if (qryProps != null && qryProps.cacheName() != null && evtMgr.isRecordable(EVT_CACHE_QUERY_OBJECT_READ)) {
+            ClusterNode locNode = ctx.discovery().localNode();
+            UUID subjId = SecurityUtils.securitySubjectId(ctx);
+
+            rowConverter = row -> {
+                evtMgr.record(new CacheQueryReadEvent<>(
+                    locNode,
+                    "SQL fields query result set row read.",
+                    EVT_CACHE_QUERY_OBJECT_READ,
+                    CacheQueryType.SQL_FIELDS.name(),
+                    qryProps.cacheName(),
+                    null,
+                    qry.sql(),
+                    null,
+                    null,
+                    qry.parameters(),
+                    subjId,
+                    null,
+                    null,
+                    null,
+                    null,
+                    row));
+
+                return row;
+            };
+        }
+
         Iterator<List<?>> it = new ConvertingClosableIterator<>(iteratorsHolder().iterator(qry.iterator()), ectx,
-            fieldConverter);
+            fieldConverter, rowConverter);
 
         return new ListFieldsQueryCursor<>(plan, it, ectx);
     }
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
index e3d19e18..998f83b 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/util/ConvertingClosableIterator.java
@@ -40,14 +40,19 @@
     @Nullable private final Function<Object, Object> fieldConverter;
 
     /** */
+    @Nullable Function<List<Object>, List<Object>> rowConverter;
+
+    /** */
     public ConvertingClosableIterator(
         Iterator<Row> it,
         ExecutionContext<Row> ectx,
-        @Nullable Function<Object, Object> fieldConverter
+        @Nullable Function<Object, Object> fieldConverter,
+        @Nullable Function<List<Object>, List<Object>> rowConverter
     ) {
         this.it = it;
         rowHnd = ectx.rowHandler();
         this.fieldConverter = fieldConverter;
+        this.rowConverter = rowConverter;
     }
 
     /**
@@ -70,7 +75,7 @@
         for (int i = 0; i < rowSize; i++)
             res.add(fieldConverter == null ? rowHnd.get(i, next) : fieldConverter.apply(rowHnd.get(i, next)));
 
-        return res;
+        return rowConverter == null ? res : rowConverter.apply(res);
     }
 
     /**
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
index 064fba3..e67fb63 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/integration/SqlDiagnosticIntegrationTest.java
@@ -24,13 +24,24 @@
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.SqlConfiguration;
+import org.apache.ignite.events.CacheQueryExecutedEvent;
+import org.apache.ignite.events.CacheQueryReadEvent;
+import org.apache.ignite.events.SqlQueryExecutionEvent;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.Test;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
+import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;
 import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
 import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.cleanPerformanceStatisticsDir;
 import static org.apache.ignite.internal.processors.performancestatistics.AbstractPerformanceStatisticsTest.startCollectStatistics;
@@ -40,6 +51,34 @@
  * Test SQL diagnostic tools.
  */
 public class SqlDiagnosticIntegrationTest extends AbstractBasicIntegrationTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setSqlConfiguration(new SqlConfiguration().setQueryEnginesConfiguration(new CalciteQueryEngineConfiguration()))
+            .setIncludeEventTypes(EVT_SQL_QUERY_EXECUTION, EVT_CACHE_QUERY_EXECUTED, EVT_CACHE_QUERY_OBJECT_READ);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrids(nodeCount());
+
+        client = startClientGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
     /** */
     @Override protected int nodeCount() {
         return 2;
@@ -125,4 +164,64 @@
         assertTrue("Query reads expected on nodes: " + readsNodes, readsNodes.isEmpty());
         assertEquals(Collections.singleton(lastQryId.get()), readsQueries);
     }
+
+    /** */
+    @Test
+    public void testSqlEvents() {
+        sql("CREATE TABLE test_event (a INT) WITH cache_name=\"test_event\"");
+
+        AtomicIntegerArray evtsSqlExec = new AtomicIntegerArray(nodeCount());
+        AtomicIntegerArray evtsQryExec = new AtomicIntegerArray(nodeCount());
+        AtomicIntegerArray evtsQryRead = new AtomicIntegerArray(nodeCount());
+        for (int i = 0; i < nodeCount(); i++) {
+            int n = i;
+            grid(i).events().localListen(e -> {
+                evtsSqlExec.incrementAndGet(n);
+
+                assertTrue(e instanceof SqlQueryExecutionEvent);
+                assertTrue(((SqlQueryExecutionEvent)e).text().toLowerCase().contains("test_event"));
+
+                return true;
+            }, EVT_SQL_QUERY_EXECUTION);
+
+            grid(i).events().localListen(e -> {
+                evtsQryExec.incrementAndGet(n);
+
+                assertTrue(e instanceof CacheQueryExecutedEvent);
+                assertEquals("test_event", ((CacheQueryExecutedEvent<?, ?>)e).cacheName());
+                assertTrue(((CacheQueryExecutedEvent<?, ?>)e).clause().toLowerCase().contains("test_event"));
+                assertEquals(SQL_FIELDS.name(), ((CacheQueryExecutedEvent<?, ?>)e).queryType());
+                assertEquals(3, ((CacheQueryExecutedEvent<?, ?>)e).arguments().length);
+                assertNull(((CacheQueryExecutedEvent<?, ?>)e).scanQueryFilter());
+                assertNull(((CacheQueryExecutedEvent<?, ?>)e).continuousQueryFilter());
+
+                return true;
+            }, EVT_CACHE_QUERY_EXECUTED);
+
+            grid(i).events().localListen(e -> {
+                evtsQryRead.incrementAndGet(n);
+
+                assertTrue(e instanceof CacheQueryReadEvent);
+                assertEquals(SQL_FIELDS.name(), ((CacheQueryReadEvent<?, ?>)e).queryType());
+                assertTrue(((CacheQueryReadEvent<?, ?>)e).clause().toLowerCase().contains("test_event"));
+                assertNotNull(((CacheQueryReadEvent<?, ?>)e).row());
+
+                return true;
+            }, EVT_CACHE_QUERY_OBJECT_READ);
+        }
+
+        grid(0).cache("test_event").query(new SqlFieldsQuery("INSERT INTO test_event VALUES (?), (?), (?)")
+                .setArgs(0, 1, 2)).getAll();
+
+        grid(0).cache("test_event").query(new SqlFieldsQuery("SELECT * FROM test_event WHERE a IN (?, ?, ?)")
+                .setArgs(0, 1, 3)).getAll();
+
+        assertEquals(2, evtsSqlExec.get(0));
+        assertEquals(0, evtsSqlExec.get(1));
+        assertEquals(2, evtsQryExec.get(0));
+        assertEquals(0, evtsQryExec.get(1));
+        // 1 event fired by insert (number of rows inserted) + 2 events (1 per row selected) fired by the second query.
+        assertEquals(3, evtsQryRead.get(0));
+        assertEquals(0, evtsQryRead.get(1));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
index cdacb5c..69716c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java
+++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java
@@ -545,6 +545,7 @@
      * @see CacheEvent
      */
     public static final int EVT_CACHE_OBJECT_EXPIRED = 70;
+
     /**
      * Built-in event type: cache rebalance started.
      * <p>
@@ -955,7 +956,7 @@
      * This event is triggered after a corresponding SQL query validated and before it is executed.
      * Unlike {@link #EVT_CACHE_QUERY_EXECUTED}, {@code EVT_SQL_QUERY_EXECUTION} is fired only once for a request
      * and does not relate to a specific cache.
-     * Enet includes the following information: qurey text and its arguments, security subject id.
+     * Event includes the following information: query text and its arguments, security subject id.
      *
      * <p>
      * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 7556fdc..d6a7b7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -3088,7 +3088,7 @@
                         QueryEngine qryEngine = engineForQuery(cliCtx, qry);
 
                         if (qryEngine != null) {
-                            QueryProperties qryProps = new QueryProperties(keepBinary);
+                            QueryProperties qryProps = new QueryProperties(cctx == null ? null : cctx.name(), keepBinary);
 
                             if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isBatched()) {
                                 res = qryEngine.queryBatched(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
index fa41ffb..9e2bad2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryProperties.java
@@ -17,15 +17,21 @@
 
 package org.apache.ignite.internal.processors.query;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
  * Additional properties to execute the query (Stored in {@link QueryContext}).
  */
 public final class QueryProperties {
     /** */
+    @Nullable String cacheName;
+
+    /** */
     private final boolean keepBinary;
 
     /** */
-    public QueryProperties(boolean keepBinary) {
+    public QueryProperties(@Nullable String cacheName, boolean keepBinary) {
+        this.cacheName = cacheName;
         this.keepBinary = keepBinary;
     }
 
@@ -33,4 +39,9 @@
     public boolean keepBinary() {
         return keepBinary;
     }
+
+    /** */
+    public @Nullable String cacheName() {
+        return cacheName;
+    }
 }