IGNITE-16136 Fix deadlock on system thread pool during marshaller mapping and binary metadata requests (#10204)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index ceac3ef..327e40d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -363,7 +363,13 @@
     public void onMappingAccepted(final MarshallerMappingItem item) {
         ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
 
-        cache.replace(item.typeId(), new MappedName(item.className(), true));
+        MappedName oldMappedName = cache.put(item.typeId(), new MappedName(item.className(), true));
+
+        assert oldMappedName == null || item.className().equals(oldMappedName.className()) :
+            "Class name resolved from cluster: "
+                + item.className()
+                + ", class name from local cache: "
+                + oldMappedName.className();
 
         closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className()));
     }
@@ -498,30 +504,6 @@
         return null;
     }
 
-    /**
-     * @param item Item.
-     * @param resolvedClsName Resolved class name.
-     */
-    public void onMissedMappingResolved(final MarshallerMappingItem item, String resolvedClsName) {
-        ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
-
-        int typeId = item.typeId();
-        MappedName mappedName = cache.get(typeId);
-
-        if (mappedName != null)
-            assert resolvedClsName.equals(mappedName.className()) :
-                    "Class name resolved from cluster: "
-                            + resolvedClsName
-                            + ", class name from local cache: "
-                            + mappedName.className();
-        else {
-            mappedName = new MappedName(resolvedClsName, true);
-            cache.putIfAbsent(typeId, mappedName);
-
-            closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), resolvedClsName));
-        }
-    }
-
     /** {@inheritDoc} */
     @Override public boolean isSystemType(String typeName) {
         return sysTypesSet.contains(typeName);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index d96b502..cc6c014 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -53,7 +53,6 @@
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
-
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
@@ -354,13 +353,15 @@
     }
 
     /**
-     * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case
+     * Allows client node to request the latest version of binary metadata for a given typeId from the cluster in case
      * client is able to detect that it has obsolete metadata in its local cache.
      *
      * @param typeId ID of binary type.
      * @return future to wait for request arrival on.
      */
     GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) {
+        assert ctx.clientNode();
+
         ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap);
 
         ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut);
@@ -604,35 +605,12 @@
                     fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
                 else {
                     if (clientNode) {
-                        BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer);
+                        boolean success = casBinaryMetadata(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer));
 
-                        holder = metaLocCache.putIfAbsent(typeId, newHolder);
-
-                        if (holder != null) {
-                            boolean obsoleteUpd = false;
-
-                            do {
-                                holder = metaLocCache.get(typeId);
-
-                                if (obsoleteUpdate(
-                                    holder.pendingVersion(),
-                                    holder.acceptedVersion(),
-                                    pendingVer,
-                                    acceptedVer)) {
-                                    obsoleteUpd = true;
-
-                                    fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
-
-                                    break;
-                                }
-                            }
-                            while (!metaLocCache.replace(typeId, holder, newHolder));
-
-                            if (!obsoleteUpd)
-                                initSyncFor(typeId, pendingVer, fut);
-                        }
-                        else
+                        if (success)
                             initSyncFor(typeId, pendingVer, fut);
+                        else
+                            fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
                     }
                     else {
                         initSyncFor(typeId, pendingVer, fut);
@@ -659,24 +637,8 @@
 
                         BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
 
-                        if (clientNode) {
-                            holder = metaLocCache.putIfAbsent(typeId, newHolder);
-
-                            if (holder != null) {
-                                do {
-                                    holder = metaLocCache.get(typeId);
-
-                                    if (obsoleteUpdate(
-                                        holder.pendingVersion(),
-                                        holder.acceptedVersion(),
-                                        pendingVer,
-                                        acceptedVer))
-                                        break;
-
-                                }
-                                while (!metaLocCache.replace(typeId, holder, newHolder));
-                            }
-                        }
+                        if (clientNode)
+                            casBinaryMetadata(typeId, newHolder);
                         else {
                             if (log.isDebugEnabled())
                                 log.debug("Updated metadata on server node [holder=" + newHolder +
@@ -717,18 +679,13 @@
             int newAcceptedVer = msg.acceptedVersion();
 
             if (clientNode) {
-                BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
-                    holder.pendingVersion(), newAcceptedVer);
+                boolean success = casBinaryMetadata(typeId,
+                    new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
 
-                do {
-                    holder = metaLocCache.get(typeId);
+                ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
 
-                    int oldAcceptedVer = holder.acceptedVersion();
-
-                    if (oldAcceptedVer > newAcceptedVer)
-                        break;
-                }
-                while (!metaLocCache.replace(typeId, holder, newHolder));
+                if (success && fut != null)
+                    fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
             }
             else {
                 int oldAcceptedVer = holder.acceptedVersion();
@@ -784,6 +741,29 @@
     }
 
     /**
+     * @param typeId Type id.
+     * @param newHolder New binary metadata holder.
+     * @return {@code true} if new holder was added successfully.
+     */
+    private boolean casBinaryMetadata(int typeId, BinaryMetadataHolder newHolder) {
+        BinaryMetadataHolder oldHolder;
+
+        do {
+            oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+            if (oldHolder == null)
+                return true;
+
+            if (obsoleteUpdate(oldHolder.pendingVersion(), oldHolder.acceptedVersion(),
+                newHolder.pendingVersion(), newHolder.acceptedVersion()))
+                return false;
+        }
+        while (!metaLocCache.replace(typeId, oldHolder, newHolder));
+
+        return true;
+    }
+
+    /**
      * Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving
      * {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
      */
@@ -915,24 +895,7 @@
             }
 
             try {
-                BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()));
-
-                BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
-
-                if (oldHolder != null) {
-                    do {
-                        oldHolder = metaLocCache.get(typeId);
-
-                        // typeId metadata cannot be removed after initialization.
-                        if (obsoleteUpdate(
-                            oldHolder.pendingVersion(),
-                            oldHolder.acceptedVersion(),
-                            newHolder.pendingVersion(),
-                            newHolder.acceptedVersion()))
-                            break;
-                    }
-                    while (!metaLocCache.replace(typeId, oldHolder, newHolder));
-                }
+                casBinaryMetadata(typeId, U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config())));
 
                 fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 0be4e09..f7d9732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -97,12 +97,12 @@
 
                 try {
                     ioMgr.sendToGridTopic(
-                            srvNode,
-                            GridTopic.TOPIC_MAPPING_MARSH,
-                            new MissingMappingRequestMessage(
-                                    item.platformId(),
-                                    item.typeId()),
-                            GridIoPolicy.SYSTEM_POOL);
+                        srvNode,
+                        GridTopic.TOPIC_MAPPING_MARSH,
+                        new MissingMappingRequestMessage(
+                            item.platformId(),
+                            item.typeId()),
+                        GridIoPolicy.SYSTEM_POOL);
 
                     if (discoMgr.node(srvNode.id()) == null)
                         continue;
@@ -113,21 +113,22 @@
                 }
                 catch (IgniteCheckedException ignored) {
                     U.warn(log,
-                            "Failed to request marshaller mapping from remote node (proceeding with the next one): "
-                                    + srvNode);
+                        "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+                            + srvNode);
                 }
             }
 
             noSrvsInCluster = pendingNode == null;
         }
 
-        if (noSrvsInCluster)
+        if (noSrvsInCluster) {
             onDone(MappingExchangeResult.createFailureResult(
-                    new IgniteCheckedException(
-                            "All server nodes have left grid, cannot request mapping [platformId: "
-                                    + item.platformId()
-                                    + "; typeId: "
-                                    + item.typeId() + "]")));
+                new IgniteCheckedException(
+                    "All server nodes have left grid, cannot request mapping [platformId: "
+                        + item.platformId()
+                        + "; typeId: "
+                        + item.typeId() + "]")));
+        }
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 6231da6..f689573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -216,7 +216,7 @@
 
             if (fut != null) {
                 if (resolvedClsName != null) {
-                    marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+                    marshallerCtx.onMappingAccepted(new MarshallerMappingItem(platformId, typeId, resolvedClsName));
 
                     fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
                 }
@@ -312,6 +312,11 @@
                 }
             });
 
+            ClientRequestFuture rqFut = clientReqSyncMap.get(new MarshallerMappingItem(item.platformId(), item.typeId(), null));
+
+            if (rqFut != null)
+                rqFut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+
             GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
 
             if (fut != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
index c13c48e..eae07d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.marshaller;
 
 import java.io.Serializable;
+import java.util.Objects;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
 /**
@@ -57,6 +58,24 @@
     }
 
     /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        MappedName name = (MappedName)o;
+
+        return accepted == name.accepted && Objects.equals(clsName, name.clsName);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return Objects.hash(clsName, accepted);
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(MappedName.class, this);
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
index 609c537..f715eb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.marshaller;
 
 import java.io.Serializable;
+import java.util.Objects;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -84,7 +85,7 @@
 
         return platformId == that.platformId
                 && typeId == that.typeId
-                && (clsName != null ? clsName.equals(that.clsName) : that.clsName == null);
+                && (Objects.equals(clsName, that.clsName));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
index eb50850..bd0f9b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
@@ -135,27 +135,28 @@
      * @param cache Cache.
      */
     public GridFutureAdapter<MappingExchangeResult> requestMapping(
-            MarshallerMappingItem item,
-            ConcurrentMap<Integer, MappedName> cache
+        MarshallerMappingItem item,
+        ConcurrentMap<Integer, MappedName> cache
     ) {
+        assert ctx.clientNode();
+        assert item.className() == null;
+
         ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
-
         ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
-
-        if (oldFut != null)
-            return oldFut;
+        ClientRequestFuture resFut = oldFut == null ? newFut : oldFut;
 
         MappedName mappedName = cache.get(item.typeId());
 
-        if (mappedName != null) {
-            newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
-
-            return newFut;
+        if (mappedName == null) {
+            if (oldFut == null)
+                newFut.requestMapping();
+        }
+        else {
+            if (mappedName.accepted())
+                resFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
         }
 
-        newFut.requestMapping();
-
-        return newFut;
+        return resFut;
     }
 
     /**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
new file mode 100644
index 0000000..5f19e41
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVTS_CACHE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/** */
+public class IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAbstractTest {
+    /** Waiting timeout. */
+    private static final long AWAIT_PROCESSING_TIMEOUT_MS = 10_000L;
+
+    /** Limited thread pool size. */
+    private static final int LIMITED_SYSTEM_THREAD_POOL = 4;
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** External class loader. */
+    private static final ClassLoader extClsLdr = getExternalClassLoader();
+
+    /** Person class name. */
+    private static final String PERSON_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Person";
+
+    /** Organization class name. */
+    private static final String ORGANIZATION_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Organization";
+
+    /** Address class name. */
+    private static final String ADDRESS_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Address";
+
+    /** Compute job result class name. */
+    private static final String JOB_RESULT_CLASS_NAME_PREFIX = "org.apache.ignite.tests.p2p.compute.ResultV";
+
+    /** Client work directory absolute path. */
+    private String clntWorkDir;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        if (cfg.isClientMode())
+            cfg.setWorkDirectory(clntWorkDir);
+
+        cfg.setClassLoader(extClsLdr);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setWriteSynchronizationMode(FULL_SYNC));
+        cfg.setIncludeEventTypes(EVTS_CACHE);
+
+        cfg.setSystemThreadPoolSize(LIMITED_SYSTEM_THREAD_POOL);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        clntWorkDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "clnt", true).getAbsolutePath();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        Path path = Paths.get(clntWorkDir, DataStorageConfiguration.DFLT_MARSHALLER_PATH);
+
+        for (File file : Objects.requireNonNull(path.toFile().listFiles()))
+            Files.delete(file.toPath());
+
+        Files.deleteIfExists(path);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDiscoeryMarshallerDelayedWithOverfloodThreadPool() throws Exception {
+        doTestMarshallingBinaryMappingsLoadedFromClient(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testDiscoeryBinaryMetaDelayedWithOverfloodThreadPool() throws Exception {
+        doTestMarshallingBinaryMappingsLoadedFromClient(false);
+    }
+
+    /**
+     * @param receiveMetadataOnClientJoin If {@code true} than binary metadata will exist on the server node and loaded
+     * by the client node on the node join exchange, otherwise it will be requested by client peer-2-peer though the TcpCommunicationSpi.
+     * @throws Exception If fails.
+     */
+    private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMetadataOnClientJoin) throws Exception {
+        CountDownLatch delayMappingLatch = new CountDownLatch(1);
+        AtomicInteger loadKeys = new AtomicInteger(100);
+        CountDownLatch evtReceiveLatch = new CountDownLatch(1);
+        int initialKeys = receiveMetadataOnClientJoin ? 10 : 0;
+
+        IgniteEx srv1 = startGrid(0);
+
+        TestRecordingCommunicationSpi.spi(srv1)
+            .blockMessages((IgniteBiPredicate<ClusterNode, Message>)(node, msg) -> msg instanceof MissingMappingResponseMessage ||
+                msg instanceof MetadataResponseMessage);
+
+        // Load data pior to the client note starts, so the client will receive the binary metadata on the client node join.
+        for (int i = 0; i < initialKeys; i++)
+            srv1.cache(DEFAULT_CACHE_NAME).put(i, createOrganization(extClsLdr, i));
+
+        Ignite cl1 = startClientGrid(1,
+            (UnaryOperator<IgniteConfiguration>)cfg -> cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+                @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                    if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                        try {
+                            DiscoverySpiCustomMessage custom =
+                                ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
+
+                            if (custom instanceof CustomMessageWrapper) {
+                                DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
+
+                                if (delegate instanceof MappingAcceptedMessage) {
+                                    MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "item");
+
+                                    if (item.className().equals(PERSON_CLASS_NAME) ||
+                                        item.className().equals(ORGANIZATION_CLASS_NAME) ||
+                                        item.className().equals(ADDRESS_CLASS_NAME)
+                                    ) {
+                                        try {
+                                            U.await(delayMappingLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                                        }
+                                        catch (Exception e) {
+                                            fail("Mapping proposed message must be released.");
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        catch (Throwable e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    super.startMessageProcess(msg);
+                }
+            }.setIpFinder(IP_FINDER)));
+
+        awaitPartitionMapExchange();
+
+        cl1.events().remoteListen(
+            (IgniteBiPredicate<UUID, Event>)(uuid, evt) -> {
+                info("Event [" + evt.shortDisplay() + ']');
+
+                evtReceiveLatch.countDown();
+
+                return true;
+            },
+            t -> true,
+            EVT_CACHE_OBJECT_PUT);
+
+        // Flood system thread pool with cache events.
+        GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> {
+            int key;
+
+            while ((key = loadKeys.decrementAndGet()) > initialKeys && !Thread.currentThread().isInterrupted())
+                srv1.cache(DEFAULT_CACHE_NAME).put(key, createOrganization(extClsLdr, key));
+
+            return true;
+        }, 8, "cache-adder-thread").get();
+
+        assertTrue(GridTestUtils.waitForCondition(() -> TestRecordingCommunicationSpi.spi(srv1).hasBlockedMessages(),
+            AWAIT_PROCESSING_TIMEOUT_MS));
+
+        delayMappingLatch.countDown();
+
+        assertTrue(U.await(evtReceiveLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryMetaDelayedForComputeJobResult() throws Exception {
+        CountDownLatch latch = new CountDownLatch(1);
+
+        startGrid(0);
+
+        Ignite cl1 = startClientGrid(1, (UnaryOperator<IgniteConfiguration>)cfg ->
+            cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+                @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+                    if (msg instanceof TcpDiscoveryCustomEventMessage) {
+                        try {
+                            DiscoverySpiCustomMessage custom =
+                                ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
+
+                            if (custom instanceof CustomMessageWrapper) {
+                                DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
+
+                                if (delegate instanceof MappingProposedMessage) {
+                                    MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "mappingItem");
+
+                                    if (item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) {
+                                        try {
+                                            U.await(latch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                                        }
+                                        catch (Exception e) {
+                                            fail("Exception must never be thrown: " + e.getMessage());
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                        catch (Throwable e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    super.startMessageProcess(msg);
+                }
+            }.setIpFinder(IP_FINDER)));
+
+        AtomicInteger results = new AtomicInteger(4);
+
+        // Flood system thread pool with task results.
+        IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> {
+            int v;
+
+            while ((v = results.decrementAndGet()) >= 0) {
+                int v0 = v;
+
+                Object ignore = cl1.compute().call(() -> createResult(extClsLdr, v0));
+            }
+
+            return true;
+        }, LIMITED_SYSTEM_THREAD_POOL, "compute-thread");
+
+        latch.countDown();
+
+        fut.get(AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param extClsLdr Class loader.
+     * @param key Cache key.
+     * @return Organization.
+     * @throws Exception If failed.
+     */
+    private static Object createOrganization(ClassLoader extClsLdr, int key) throws Exception {
+        Class<?> personCls = extClsLdr.loadClass(PERSON_CLASS_NAME);
+        Class<?> addrCls = extClsLdr.loadClass(ADDRESS_CLASS_NAME);
+
+        Constructor<?> personConstructor = personCls.getConstructor(String.class);
+        Constructor<?> addrConstructor = addrCls.getConstructor(String.class, Integer.TYPE);
+        Constructor<?> organizationConstructor = extClsLdr.loadClass(ORGANIZATION_CLASS_NAME)
+            .getConstructor(String.class, personCls, addrCls);
+
+        return organizationConstructor.newInstance("Organization " + key,
+            personConstructor.newInstance("Persone name " + key),
+            addrConstructor.newInstance("Street " + key, key));
+    }
+
+    /**
+     * @param extClsLdr Class loader.
+     * @param ver Class type version.
+     * @return Result.
+     * @throws Exception If fails.
+     */
+    public static Object createResult(ClassLoader extClsLdr, int ver) throws Exception {
+        Class<?> resCls = extClsLdr.loadClass(JOB_RESULT_CLASS_NAME_PREFIX + ver);
+
+        return resCls.getConstructor(int.class).newInstance(ver);
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index cf53ea1..d61a62e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1002,7 +1002,7 @@
      * @throws Exception If anything failed.
      */
     protected IgniteEx startClientGrid() throws Exception {
-        return startClientGrid(getTestIgniteInstanceName());
+        return startClientGrid(getTestIgniteInstanceName(), (UnaryOperator<IgniteConfiguration>)null);
     }
 
     /**
@@ -1061,6 +1061,16 @@
     }
 
     /**
+     * @param idx Index of the grid to start.
+     * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected IgniteEx startClientGrid(int idx, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
+        return startClientGrid(getTestIgniteInstanceName(idx), cfgOp);
+    }
+
+    /**
      * Starts new client grid with given index.
      *
      * @param idx Index of the grid to start.
@@ -1068,7 +1078,7 @@
      * @throws Exception If anything failed.
      */
     protected IgniteEx startClientGrid(int idx) throws Exception {
-        return startClientGrid(getTestIgniteInstanceName(idx));
+        return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null);
     }
 
     /**
@@ -1083,7 +1093,7 @@
         IgnitionEx.dependencyResolver(rslvr);
 
         try {
-            return startClientGrid(getTestIgniteInstanceName(idx));
+            return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null);
         }
         finally {
             IgnitionEx.dependencyResolver(null);
@@ -1098,10 +1108,22 @@
      * @throws Exception If anything failed.
      */
     protected IgniteEx startClientGrid(String igniteInstanceName) throws Exception {
+        return startClientGrid(igniteInstanceName, (UnaryOperator<IgniteConfiguration>)null);
+    }
+
+    /**
+     * Starts new client grid with given name.
+     *
+     * @param igniteInstanceName Ignite instance name.
+     * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+     * @return Started grid.
+     * @throws Exception If anything failed.
+     */
+    protected IgniteEx startClientGrid(String igniteInstanceName, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
         IgnitionEx.setClientMode(true);
 
         try {
-            return (IgniteEx)startGrid(igniteInstanceName, (GridSpringResourceContext)null);
+            return (IgniteEx)startGrid(igniteInstanceName, cfgOp, null);
         }
         finally {
             IgnitionEx.setClientMode(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
index e2419dc..8492bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
@@ -49,6 +49,7 @@
 import org.apache.ignite.internal.processors.cache.GridLongRunningInitNewCrdFutureDiagnosticsTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest;
+import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest;
 import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheSeparateDirectoryTest;
 import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadPoolSizeTest;
@@ -143,6 +144,7 @@
     IgniteMarshallerCacheFSRestoreTest.class,
     IgniteMarshallerCacheClassNameConflictTest.class,
     IgniteMarshallerCacheClientRequestsMappingOnMissTest.class,
+    IgniteMarshallerCacheClientRequestsMappingTest.class,
     IgniteMarshallerCacheSeparateDirectoryTest.class,
 
     IgniteDiagnosticMessagesTest.class,
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java
new file mode 100644
index 0000000..cf680e7
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class Result {
+    /** Param. */
+    final int v;
+
+    /**
+     * @param v Param.
+     */
+    Result(int v) {
+        this.v = v;
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java
new file mode 100644
index 0000000..1495d07
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV0 extends Result {
+    /**
+     * @param v Param.
+     */
+    public ResultV0(int v) {
+        super(v);
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java
new file mode 100644
index 0000000..d56a31e
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV1 extends Result {
+    /**
+     * @param v Param.
+     */
+    public ResultV1(int v) {
+        super(v);
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java
new file mode 100644
index 0000000..eb7e531
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV2 extends Result {
+    /**
+     * @param v Param.
+     */
+    public ResultV2(int v) {
+        super(v);
+    }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java
new file mode 100644
index 0000000..b0080d1
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV3 extends Result {
+    /**
+     * @param v Param.
+     */
+    public ResultV3(int v) {
+        super(v);
+    }
+}