IGNITE-11798 Fix memory leak on unstable topology caused by partition reservation (#7251)


diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
index b2ce082..0fad2c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationKey.java
@@ -19,6 +19,7 @@
 
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
  * Partition reservation key.
@@ -48,6 +49,13 @@
         return cacheName;
     }
 
+    /**
+     * @return Topology version of reservation.
+     */
+    public AffinityTopologyVersion topologyVersion() {
+        return topVer;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean equals(Object o) {
         if (this == o)
@@ -70,4 +78,9 @@
 
         return res;
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(PartitionReservationKey.class, this);
+    }
 }
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
index dfaef24..5e986d07 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/PartitionReservationManager.java
@@ -17,20 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.PartitionLossPolicy;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.Nullable;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,6 +24,24 @@
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.PartitionLossPolicy;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsReservation;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_ONLY_SAFE;
 import static org.apache.ignite.cache.PartitionLossPolicy.READ_WRITE_SAFE;
@@ -46,19 +50,25 @@
 import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
 
 /**
- * Class responsible for partition reservation for queries executed on local node.
- * Prevents partitions from being eveicted from node during query execution.
+ * Class responsible for partition reservation for queries executed on local node. Prevents partitions from being
+ * evicted from node during query execution.
  */
-public class PartitionReservationManager {
+public class PartitionReservationManager implements PartitionsExchangeAware {
     /** Special instance of reservable object for REPLICATED caches. */
     private static final ReplicatedReservable REPLICATED_RESERVABLE = new ReplicatedReservable();
 
     /** Kernal context. */
     private final GridKernalContext ctx;
 
-    /** Reservations. */
+    /**
+     * Group reservations cache. When affinity version is not changed and all primary partitions must be reserved we get
+     * group reservation from this map instead of create new reservation group.
+     */
     private final ConcurrentMap<PartitionReservationKey, GridReservable> reservations = new ConcurrentHashMap<>();
 
+    /** Logger. */
+    private final IgniteLogger log;
+
     /**
      * Constructor.
      *
@@ -66,11 +76,43 @@
      */
     public PartitionReservationManager(GridKernalContext ctx) {
         this.ctx = ctx;
+
+        log = ctx.log(PartitionReservationManager.class);
+
+        ctx.cache().context().exchange().registerExchangeAwareComponent(this);
+    }
+
+    /**
+     * Decide whether to ignore or proceed with lost partition.
+     *
+     * @param cctx Cache context.
+     * @param part Partition.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static void ignoreLostPartitionIfPossible(GridCacheContext cctx, GridDhtLocalPartition part)
+        throws IgniteCheckedException {
+        PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+
+        if (plc != null) {
+            if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) {
+                throw new CacheInvalidStateException("Failed to execute query because cache partition has been " +
+                    "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
+            }
+        }
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param p Partition ID.
+     * @return Partition.
+     */
+    private static GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) {
+        return cctx.topology().localPartition(p, NONE, false);
     }
 
     /**
      * @param cacheIds Cache IDs.
-     * @param topVer Topology version.
+     * @param reqTopVer Topology version from request.
      * @param explicitParts Explicit partitions list.
      * @param nodeId Node ID.
      * @param reqId Request ID.
@@ -79,18 +121,18 @@
      */
     public PartitionReservation reservePartitions(
         @Nullable List<Integer> cacheIds,
-        AffinityTopologyVersion topVer,
+        AffinityTopologyVersion reqTopVer,
         final int[] explicitParts,
         UUID nodeId,
         long reqId
     ) throws IgniteCheckedException {
-        assert topVer != null;
+        assert reqTopVer != null;
+
+        AffinityTopologyVersion topVer = ctx.cache().context().exchange().lastAffinityChangedTopologyVersion(reqTopVer);
 
         if (F.isEmpty(cacheIds))
             return new PartitionReservation(Collections.emptyList());
 
-        List<GridReservable> reserved = new ArrayList<>();
-
         Collection<Integer> partIds;
 
         if (explicitParts == null)
@@ -104,6 +146,8 @@
                 partIds.add(explicitPart);
         }
 
+        List<GridReservable> reserved = new ArrayList<>();
+
         for (int i = 0; i < cacheIds.size(); i++) {
             GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(cacheIds.get(i));
 
@@ -111,8 +155,8 @@
             if (cctx == null) {
                 return new PartitionReservation(reserved,
                     String.format("Failed to reserve partitions for query (cache is not " +
-                    "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
-                    ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)));
+                            "found on local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
+                        ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i)));
             }
 
             if (cctx.isLocal() || !cctx.rebalanceEnabled())
@@ -128,8 +172,8 @@
                     if (!r.reserve())
                         return new PartitionReservation(reserved,
                             String.format("Failed to reserve partitions for query (group " +
-                            "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
-                            "cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
+                                "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
+                                "cacheName=%s]", ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name()));
 
                     reserved.add(r);
                 }
@@ -147,20 +191,20 @@
 
                             if (partState != OWNING)
                                 return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve partitions for " +
-                                        "query (partition of REPLICATED cache is not in OWNING state) [" +
-                                        "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                        "cacheName=%s, part=%s, partFound=%s, partState=%s]",
-                                    ctx.localNodeId(),
-                                    nodeId,
-                                    reqId,
-                                    topVer,
-                                    cacheIds.get(i),
-                                    cctx.name(),
-                                    p,
-                                    (part != null),
-                                    partState
-                                ));
+                                    String.format("Failed to reserve partitions for " +
+                                            "query (partition of REPLICATED cache is not in OWNING state) [" +
+                                            "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
+                                            "cacheName=%s, part=%s, partFound=%s, partState=%s]",
+                                        ctx.localNodeId(),
+                                        nodeId,
+                                        reqId,
+                                        topVer,
+                                        cacheIds.get(i),
+                                        cctx.name(),
+                                        p,
+                                        (part != null),
+                                        partState
+                                    ));
                         }
 
                         // Mark that we checked this replicated cache.
@@ -183,9 +227,28 @@
                                 ignoreLostPartitionIfPossible(cctx, part);
                             else {
                                 return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve partitions " +
-                                        "for query (partition of PARTITIONED cache is not found or not in OWNING " +
-                                        "state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
+                                    String.format("Failed to reserve partitions " +
+                                            "for query (partition of PARTITIONED cache is not found or not in OWNING " +
+                                            "state) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
+                                            "cacheName=%s, part=%s, partFound=%s, partState=%s]",
+                                        ctx.localNodeId(),
+                                        nodeId,
+                                        reqId,
+                                        topVer,
+                                        cacheIds.get(i),
+                                        cctx.name(),
+                                        partId,
+                                        (part != null),
+                                        partState
+                                    ));
+                            }
+                        }
+
+                        if (!part.reserve()) {
+                            return new PartitionReservation(reserved,
+                                String.format("Failed to reserve partitions for query " +
+                                        "(partition of PARTITIONED cache cannot be reserved) [" +
+                                        "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
                                         "cacheName=%s, part=%s, partFound=%s, partState=%s]",
                                     ctx.localNodeId(),
                                     nodeId,
@@ -194,28 +257,9 @@
                                     cacheIds.get(i),
                                     cctx.name(),
                                     partId,
-                                    (part != null),
+                                    true,
                                     partState
                                 ));
-                            }
-                        }
-
-                        if (!part.reserve()) {
-                            return new PartitionReservation(reserved,
-                                    String.format("Failed to reserve partitions for query " +
-                                    "(partition of PARTITIONED cache cannot be reserved) [" +
-                                    "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
-                                    "cacheName=%s, part=%s, partFound=%s, partState=%s]",
-                                ctx.localNodeId(),
-                                nodeId,
-                                reqId,
-                                topVer,
-                                cacheIds.get(i),
-                                cctx.name(),
-                                partId,
-                                true,
-                                partState
-                            ));
                         }
 
                         reserved.add(part);
@@ -230,19 +274,19 @@
                                 ignoreLostPartitionIfPossible(cctx, part);
                             else {
                                 return new PartitionReservation(reserved,
-                                        String.format("Failed to reserve partitions for " +
-                                        "query (partition of PARTITIONED cache is not in OWNING state after " +
-                                        "reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
-                                        "cacheId=%s, cacheName=%s, part=%s, partState=%s]",
-                                    ctx.localNodeId(),
-                                    nodeId,
-                                    reqId,
-                                    topVer,
-                                    cacheIds.get(i),
-                                    cctx.name(),
-                                    partId,
-                                    partState
-                                ));
+                                    String.format("Failed to reserve partitions for " +
+                                            "query (partition of PARTITIONED cache is not in OWNING state after " +
+                                            "reservation) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, " +
+                                            "cacheId=%s, cacheName=%s, part=%s, partState=%s]",
+                                        ctx.localNodeId(),
+                                        nodeId,
+                                        reqId,
+                                        topVer,
+                                        cacheIds.get(i),
+                                        cctx.name(),
+                                        partId,
+                                        partState
+                                    ));
                             }
                         }
                     }
@@ -281,31 +325,31 @@
     }
 
     /**
-     * Decide whether to ignore or proceed with lost partition.
-     *
-     * @param cctx Cache context.
-     * @param part Partition.
-     * @throws IgniteCheckedException If failed.
+     * Cleanup group reservations cache on change affinity version.
      */
-    private static void ignoreLostPartitionIfPossible(GridCacheContext cctx, GridDhtLocalPartition part)
-        throws IgniteCheckedException {
-        PartitionLossPolicy plc = cctx.config().getPartitionLossPolicy();
+    @Override public void onDoneAfterTopologyUnlock(final GridDhtPartitionsExchangeFuture fut) {
+        try {
+            // Must not do anything at the exchange thread. Dispatch to the management thread pool.
+            ctx.closure().runLocal(
+                new GridPlainRunnable() {
+                    @Override public void run() {
+                        AffinityTopologyVersion topVer = ctx.cache().context().exchange()
+                            .lastAffinityChangedTopologyVersion(fut.topologyVersion());
 
-        if (plc != null) {
-            if (plc == READ_ONLY_SAFE || plc == READ_WRITE_SAFE) {
-                throw new CacheInvalidStateException("Failed to execute query because cache partition has been " +
-                    "lost [cacheName=" + cctx.name() + ", part=" + part + ']');
-            }
+                        reservations.forEach((key, r) -> {
+                            if (r != REPLICATED_RESERVABLE && !F.eq(key.topologyVersion(), topVer)) {
+                                assert r instanceof GridDhtPartitionsReservation;
+
+                                ((GridDhtPartitionsReservation)r).invalidate();
+                            }
+                        });
+                    }
+                },
+                GridIoPolicy.MANAGEMENT_POOL);
         }
-    }
-
-    /**
-     * @param cctx Cache context.
-     * @param p Partition ID.
-     * @return Partition.
-     */
-    private static GridDhtLocalPartition partition(GridCacheContext<?, ?> cctx, int p) {
-        return cctx.topology().localPartition(p, NONE, false);
+        catch (Throwable e) {
+            log.error("Unexpected exception on start reservations cleanup", e);
+        }
     }
 
     /**
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java
new file mode 100644
index 0000000..af9ced4
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/MemLeakOnSqlWithClientReconnectTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.index.AbstractIndexingCommonTest;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.junit.Test;
+
+/**
+ * Tests for group reservation leaks at the PartitionReservationManager on unstable topology.
+ */
+public class MemLeakOnSqlWithClientReconnectTest extends AbstractIndexingCommonTest {
+    /** Keys count. */
+    private static final int KEY_CNT = 10;
+
+    /** Keys count. */
+    private static final int ITERS = 2000;
+
+    /** Replicated cache schema name. */
+    private static final String REPL_SCHEMA = "REPL";
+
+    /** Partitioned cache schema name. */
+    private static final String PART_SCHEMA = "PART";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.startsWith("cli"))
+            cfg.setClientMode(true).setGridLogger(new NullLogger());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        startGrid();
+
+        IgniteCache<Long, Long> partCache = grid().createCache(new CacheConfiguration<Long, Long>()
+            .setName("part")
+            .setSqlSchema("PART")
+            .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+                .setTableName("test")
+                .addQueryField("id", Long.class.getName(), null)
+                .addQueryField("val", Long.class.getName(), null)
+                .setKeyFieldName("id")
+                .setValueFieldName("val")
+            ))
+            .setAffinity(new RendezvousAffinityFunction(false, 10)));
+
+        IgniteCache<Long, Long> replCache = grid().createCache(new CacheConfiguration<Long, Long>()
+            .setName("repl")
+            .setSqlSchema("REPL")
+            .setQueryEntities(Collections.singleton(new QueryEntity(Long.class, Long.class)
+                .setTableName("test")
+                .addQueryField("id", Long.class.getName(), null)
+                .addQueryField("val", Long.class.getName(), null)
+                .setKeyFieldName("id")
+                .setValueFieldName("val")))
+            .setCacheMode(CacheMode.REPLICATED));
+
+        for (long i = 0; i < KEY_CNT; ++i) {
+            partCache.put(i, i);
+            replCache.put(i, i);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test partition group reservation leaks on partitioned cache.
+     *
+     * @throws Exception On error.
+     */
+    @Test
+    public void testPartitioned() throws Exception {
+        checkReservationLeak(false);
+    }
+
+    /**
+     * Test partition group reservation leaks on replicated cache.
+     *
+     * @throws Exception On error.
+     */
+    @Test
+    public void testReplicated() throws Exception {
+        checkReservationLeak(true);
+    }
+
+    /**
+     * Check partition group reservation leaks.
+     *
+     * @param replicated Flag to run query on partitioned or replicated cache.
+     * @throws Exception On error.
+     */
+    private void checkReservationLeak(boolean replicated) throws Exception {
+        final AtomicInteger cliNum = new AtomicInteger();
+        final AtomicBoolean end = new AtomicBoolean();
+
+        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(() -> {
+                String name = "cli_" + cliNum.getAndIncrement();
+
+                while (!end.get()) {
+                    try {
+                        startGrid(name);
+
+                        U.sleep(10);
+
+                        stopGrid(name);
+                    }
+                    catch (Exception e) {
+                        fail("Unexpected exception on start test client node");
+                    }
+                }
+            },
+            10, "cli-restart");
+
+        try {
+
+            String mainCliName = "cli-main";
+
+            IgniteEx cli = startGrid(mainCliName);
+
+            // Warm up.
+            runQuery(cli, ITERS, replicated);
+
+            int baseReservations = reservationCount(grid());
+
+            // Run multiple queries on unstable topology.
+            runQuery(cli, ITERS * 10, replicated);
+
+            int curReservations = reservationCount(grid());
+
+            assertTrue("Reservations leaks: [base=" + baseReservations + ", cur=" + curReservations + ']',
+                curReservations < baseReservations * 2);
+
+            log.info("Reservations OK: [base=" + baseReservations + ", cur=" + curReservations + ']');
+        }
+        finally {
+            end.set(true);
+        }
+
+        fut.get();
+    }
+
+    /**
+     * @param ign Ignite.
+     * @param iters Run query 'iters' times
+     * @param repl Run on replicated or partitioned cache.
+     */
+    private void runQuery(IgniteEx ign, int iters, boolean repl) {
+        for (int i = 0; i < iters; ++i)
+            sql(ign, repl ? REPL_SCHEMA : PART_SCHEMA,"SELECT * FROM test").getAll();
+    }
+
+    /**
+     * @param ign Ignite instance.
+     * @param sql SQL query.
+     * @param args Query parameters.
+     * @return Results cursor.
+     */
+    private FieldsQueryCursor<List<?>> sql(IgniteEx ign, String schema, String sql, Object... args) {
+        return ign.context().query().querySqlFields(new SqlFieldsQuery(sql)
+            .setSchema(schema)
+            .setArgs(args), false);
+    }
+
+    /**
+     * @param ign Ignite instance.
+     * @return Count of reservations.
+     */
+    private static int reservationCount(IgniteEx ign) {
+        IgniteH2Indexing idx = (IgniteH2Indexing)ign.context().query().getIndexing();
+
+        Map reservations = GridTestUtils.getFieldValue(idx.partitionReservationManager(), "reservations");
+
+        return reservations.size();
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 1de177c..b2e0d24 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -37,6 +37,7 @@
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
 import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import org.apache.ignite.internal.processors.query.MemLeakOnSqlWithClientReconnectTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -64,7 +65,8 @@
     CacheContinuousWithTransformerRandomOperationsTest.class,
     CacheContinuousQueryRandomOperationsTest.class,
     StaticCacheDdlTest.class,
-    StaticCacheDdlKeepStaticConfigurationTest.class
+    StaticCacheDdlKeepStaticConfigurationTest.class,
+    MemLeakOnSqlWithClientReconnectTest.class,
 })
 public class IgniteCacheQuerySelfTestSuite6 {
 }