IGNITE-16136 Fix deadlock on system thread pool during marshaller mapping and binary metadata requests (#10204)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index ceac3ef..327e40d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -363,7 +363,13 @@
public void onMappingAccepted(final MarshallerMappingItem item) {
ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
- cache.replace(item.typeId(), new MappedName(item.className(), true));
+ MappedName oldMappedName = cache.put(item.typeId(), new MappedName(item.className(), true));
+
+ assert oldMappedName == null || item.className().equals(oldMappedName.className()) :
+ "Class name resolved from cluster: "
+ + item.className()
+ + ", class name from local cache: "
+ + oldMappedName.className();
closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), item.className()));
}
@@ -498,30 +504,6 @@
return null;
}
- /**
- * @param item Item.
- * @param resolvedClsName Resolved class name.
- */
- public void onMissedMappingResolved(final MarshallerMappingItem item, String resolvedClsName) {
- ConcurrentMap<Integer, MappedName> cache = getCacheFor(item.platformId());
-
- int typeId = item.typeId();
- MappedName mappedName = cache.get(typeId);
-
- if (mappedName != null)
- assert resolvedClsName.equals(mappedName.className()) :
- "Class name resolved from cluster: "
- + resolvedClsName
- + ", class name from local cache: "
- + mappedName.className();
- else {
- mappedName = new MappedName(resolvedClsName, true);
- cache.putIfAbsent(typeId, mappedName);
-
- closProc.runLocalSafe(new MappingStoreTask(fileStore, item.platformId(), item.typeId(), resolvedClsName));
- }
- }
-
/** {@inheritDoc} */
@Override public boolean isSystemType(String typeName) {
return sysTypesSet.contains(typeName);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index d96b502..cc6c014 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -53,7 +53,6 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.jetbrains.annotations.Nullable;
-
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
@@ -354,13 +353,15 @@
}
/**
- * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case
+ * Allows client node to request the latest version of binary metadata for a given typeId from the cluster in case
* client is able to detect that it has obsolete metadata in its local cache.
*
* @param typeId ID of binary type.
* @return future to wait for request arrival on.
*/
GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) {
+ assert ctx.clientNode();
+
ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap);
ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut);
@@ -604,35 +605,12 @@
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
else {
if (clientNode) {
- BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer);
+ boolean success = casBinaryMetadata(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer));
- holder = metaLocCache.putIfAbsent(typeId, newHolder);
-
- if (holder != null) {
- boolean obsoleteUpd = false;
-
- do {
- holder = metaLocCache.get(typeId);
-
- if (obsoleteUpdate(
- holder.pendingVersion(),
- holder.acceptedVersion(),
- pendingVer,
- acceptedVer)) {
- obsoleteUpd = true;
-
- fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
-
- break;
- }
- }
- while (!metaLocCache.replace(typeId, holder, newHolder));
-
- if (!obsoleteUpd)
- initSyncFor(typeId, pendingVer, fut);
- }
- else
+ if (success)
initSyncFor(typeId, pendingVer, fut);
+ else
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
else {
initSyncFor(typeId, pendingVer, fut);
@@ -659,24 +637,8 @@
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
- if (clientNode) {
- holder = metaLocCache.putIfAbsent(typeId, newHolder);
-
- if (holder != null) {
- do {
- holder = metaLocCache.get(typeId);
-
- if (obsoleteUpdate(
- holder.pendingVersion(),
- holder.acceptedVersion(),
- pendingVer,
- acceptedVer))
- break;
-
- }
- while (!metaLocCache.replace(typeId, holder, newHolder));
- }
- }
+ if (clientNode)
+ casBinaryMetadata(typeId, newHolder);
else {
if (log.isDebugEnabled())
log.debug("Updated metadata on server node [holder=" + newHolder +
@@ -717,18 +679,13 @@
int newAcceptedVer = msg.acceptedVersion();
if (clientNode) {
- BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
- holder.pendingVersion(), newAcceptedVer);
+ boolean success = casBinaryMetadata(typeId,
+ new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
- do {
- holder = metaLocCache.get(typeId);
+ ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
- int oldAcceptedVer = holder.acceptedVersion();
-
- if (oldAcceptedVer > newAcceptedVer)
- break;
- }
- while (!metaLocCache.replace(typeId, holder, newHolder));
+ if (success && fut != null)
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
else {
int oldAcceptedVer = holder.acceptedVersion();
@@ -784,6 +741,29 @@
}
/**
+ * @param typeId Type id.
+ * @param newHolder New binary metadata holder.
+ * @return {@code true} if new holder was added successfully.
+ */
+ private boolean casBinaryMetadata(int typeId, BinaryMetadataHolder newHolder) {
+ BinaryMetadataHolder oldHolder;
+
+ do {
+ oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+ if (oldHolder == null)
+ return true;
+
+ if (obsoleteUpdate(oldHolder.pendingVersion(), oldHolder.acceptedVersion(),
+ newHolder.pendingVersion(), newHolder.acceptedVersion()))
+ return false;
+ }
+ while (!metaLocCache.replace(typeId, oldHolder, newHolder));
+
+ return true;
+ }
+
+ /**
* Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving
* {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
*/
@@ -915,24 +895,7 @@
}
try {
- BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()));
-
- BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
-
- if (oldHolder != null) {
- do {
- oldHolder = metaLocCache.get(typeId);
-
- // typeId metadata cannot be removed after initialization.
- if (obsoleteUpdate(
- oldHolder.pendingVersion(),
- oldHolder.acceptedVersion(),
- newHolder.pendingVersion(),
- newHolder.acceptedVersion()))
- break;
- }
- while (!metaLocCache.replace(typeId, oldHolder, newHolder));
- }
+ casBinaryMetadata(typeId, U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config())));
fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
index 0be4e09..f7d9732 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/ClientRequestFuture.java
@@ -97,12 +97,12 @@
try {
ioMgr.sendToGridTopic(
- srvNode,
- GridTopic.TOPIC_MAPPING_MARSH,
- new MissingMappingRequestMessage(
- item.platformId(),
- item.typeId()),
- GridIoPolicy.SYSTEM_POOL);
+ srvNode,
+ GridTopic.TOPIC_MAPPING_MARSH,
+ new MissingMappingRequestMessage(
+ item.platformId(),
+ item.typeId()),
+ GridIoPolicy.SYSTEM_POOL);
if (discoMgr.node(srvNode.id()) == null)
continue;
@@ -113,21 +113,22 @@
}
catch (IgniteCheckedException ignored) {
U.warn(log,
- "Failed to request marshaller mapping from remote node (proceeding with the next one): "
- + srvNode);
+ "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+ + srvNode);
}
}
noSrvsInCluster = pendingNode == null;
}
- if (noSrvsInCluster)
+ if (noSrvsInCluster) {
onDone(MappingExchangeResult.createFailureResult(
- new IgniteCheckedException(
- "All server nodes have left grid, cannot request mapping [platformId: "
- + item.platformId()
- + "; typeId: "
- + item.typeId() + "]")));
+ new IgniteCheckedException(
+ "All server nodes have left grid, cannot request mapping [platformId: "
+ + item.platformId()
+ + "; typeId: "
+ + item.typeId() + "]")));
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 6231da6..f689573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -216,7 +216,7 @@
if (fut != null) {
if (resolvedClsName != null) {
- marshallerCtx.onMissedMappingResolved(item, resolvedClsName);
+ marshallerCtx.onMappingAccepted(new MarshallerMappingItem(platformId, typeId, resolvedClsName));
fut.onDone(MappingExchangeResult.createSuccessfulResult(resolvedClsName));
}
@@ -312,6 +312,11 @@
}
});
+ ClientRequestFuture rqFut = clientReqSyncMap.get(new MarshallerMappingItem(item.platformId(), item.typeId(), null));
+
+ if (rqFut != null)
+ rqFut.onDone(MappingExchangeResult.createSuccessfulResult(item.className()));
+
GridFutureAdapter<MappingExchangeResult> fut = mappingExchangeSyncMap.get(item);
if (fut != null)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
index c13c48e..eae07d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MappedName.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.marshaller;
import java.io.Serializable;
+import java.util.Objects;
import org.apache.ignite.internal.util.typedef.internal.S;
/**
@@ -57,6 +58,24 @@
}
/** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ MappedName name = (MappedName)o;
+
+ return accepted == name.accepted && Objects.equals(clsName, name.clsName);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return Objects.hash(clsName, accepted);
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(MappedName.class, this);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
index 609c537..f715eb2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.marshaller;
import java.io.Serializable;
+import java.util.Objects;
import org.jetbrains.annotations.Nullable;
/**
@@ -84,7 +85,7 @@
return platformId == that.platformId
&& typeId == that.typeId
- && (clsName != null ? clsName.equals(that.clsName) : that.clsName == null);
+ && (Objects.equals(clsName, that.clsName));
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
index eb50850..bd0f9b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
@@ -135,27 +135,28 @@
* @param cache Cache.
*/
public GridFutureAdapter<MappingExchangeResult> requestMapping(
- MarshallerMappingItem item,
- ConcurrentMap<Integer, MappedName> cache
+ MarshallerMappingItem item,
+ ConcurrentMap<Integer, MappedName> cache
) {
+ assert ctx.clientNode();
+ assert item.className() == null;
+
ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
-
ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
-
- if (oldFut != null)
- return oldFut;
+ ClientRequestFuture resFut = oldFut == null ? newFut : oldFut;
MappedName mappedName = cache.get(item.typeId());
- if (mappedName != null) {
- newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
-
- return newFut;
+ if (mappedName == null) {
+ if (oldFut == null)
+ newFut.requestMapping();
+ }
+ else {
+ if (mappedName.accepted())
+ resFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
}
- newFut.requestMapping();
-
- return newFut;
+ return resFut;
}
/**
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
new file mode 100644
index 0000000..5f19e41
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClientRequestsMappingTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.UnaryOperator;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
+import org.apache.ignite.internal.processors.marshaller.MarshallerMappingItem;
+import org.apache.ignite.internal.processors.marshaller.MissingMappingResponseMessage;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.events.EventType.EVTS_CACHE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/** */
+public class IgniteMarshallerCacheClientRequestsMappingTest extends GridCommonAbstractTest {
+ /** Waiting timeout. */
+ private static final long AWAIT_PROCESSING_TIMEOUT_MS = 10_000L;
+
+ /** Limited thread pool size. */
+ private static final int LIMITED_SYSTEM_THREAD_POOL = 4;
+
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** External class loader. */
+ private static final ClassLoader extClsLdr = getExternalClassLoader();
+
+ /** Person class name. */
+ private static final String PERSON_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Person";
+
+ /** Organization class name. */
+ private static final String ORGANIZATION_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Organization";
+
+ /** Address class name. */
+ private static final String ADDRESS_CLASS_NAME = "org.apache.ignite.tests.p2p.cache.Address";
+
+ /** Compute job result class name. */
+ private static final String JOB_RESULT_CLASS_NAME_PREFIX = "org.apache.ignite.tests.p2p.compute.ResultV";
+
+ /** Client work directory absolute path. */
+ private String clntWorkDir;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (cfg.isClientMode())
+ cfg.setWorkDirectory(clntWorkDir);
+
+ cfg.setClassLoader(extClsLdr);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+ cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setWriteSynchronizationMode(FULL_SYNC));
+ cfg.setIncludeEventTypes(EVTS_CACHE);
+
+ cfg.setSystemThreadPoolSize(LIMITED_SYSTEM_THREAD_POOL);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ clntWorkDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "clnt", true).getAbsolutePath();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ Path path = Paths.get(clntWorkDir, DataStorageConfiguration.DFLT_MARSHALLER_PATH);
+
+ for (File file : Objects.requireNonNull(path.toFile().listFiles()))
+ Files.delete(file.toPath());
+
+ Files.deleteIfExists(path);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDiscoeryMarshallerDelayedWithOverfloodThreadPool() throws Exception {
+ doTestMarshallingBinaryMappingsLoadedFromClient(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testDiscoeryBinaryMetaDelayedWithOverfloodThreadPool() throws Exception {
+ doTestMarshallingBinaryMappingsLoadedFromClient(false);
+ }
+
+ /**
+ * @param receiveMetadataOnClientJoin If {@code true} than binary metadata will exist on the server node and loaded
+ * by the client node on the node join exchange, otherwise it will be requested by client peer-2-peer though the TcpCommunicationSpi.
+ * @throws Exception If fails.
+ */
+ private void doTestMarshallingBinaryMappingsLoadedFromClient(boolean receiveMetadataOnClientJoin) throws Exception {
+ CountDownLatch delayMappingLatch = new CountDownLatch(1);
+ AtomicInteger loadKeys = new AtomicInteger(100);
+ CountDownLatch evtReceiveLatch = new CountDownLatch(1);
+ int initialKeys = receiveMetadataOnClientJoin ? 10 : 0;
+
+ IgniteEx srv1 = startGrid(0);
+
+ TestRecordingCommunicationSpi.spi(srv1)
+ .blockMessages((IgniteBiPredicate<ClusterNode, Message>)(node, msg) -> msg instanceof MissingMappingResponseMessage ||
+ msg instanceof MetadataResponseMessage);
+
+ // Load data pior to the client note starts, so the client will receive the binary metadata on the client node join.
+ for (int i = 0; i < initialKeys; i++)
+ srv1.cache(DEFAULT_CACHE_NAME).put(i, createOrganization(extClsLdr, i));
+
+ Ignite cl1 = startClientGrid(1,
+ (UnaryOperator<IgniteConfiguration>)cfg -> cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage) {
+ try {
+ DiscoverySpiCustomMessage custom =
+ ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
+
+ if (custom instanceof CustomMessageWrapper) {
+ DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
+
+ if (delegate instanceof MappingAcceptedMessage) {
+ MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "item");
+
+ if (item.className().equals(PERSON_CLASS_NAME) ||
+ item.className().equals(ORGANIZATION_CLASS_NAME) ||
+ item.className().equals(ADDRESS_CLASS_NAME)
+ ) {
+ try {
+ U.await(delayMappingLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ fail("Mapping proposed message must be released.");
+ }
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ super.startMessageProcess(msg);
+ }
+ }.setIpFinder(IP_FINDER)));
+
+ awaitPartitionMapExchange();
+
+ cl1.events().remoteListen(
+ (IgniteBiPredicate<UUID, Event>)(uuid, evt) -> {
+ info("Event [" + evt.shortDisplay() + ']');
+
+ evtReceiveLatch.countDown();
+
+ return true;
+ },
+ t -> true,
+ EVT_CACHE_OBJECT_PUT);
+
+ // Flood system thread pool with cache events.
+ GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> {
+ int key;
+
+ while ((key = loadKeys.decrementAndGet()) > initialKeys && !Thread.currentThread().isInterrupted())
+ srv1.cache(DEFAULT_CACHE_NAME).put(key, createOrganization(extClsLdr, key));
+
+ return true;
+ }, 8, "cache-adder-thread").get();
+
+ assertTrue(GridTestUtils.waitForCondition(() -> TestRecordingCommunicationSpi.spi(srv1).hasBlockedMessages(),
+ AWAIT_PROCESSING_TIMEOUT_MS));
+
+ delayMappingLatch.countDown();
+
+ assertTrue(U.await(evtReceiveLatch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testBinaryMetaDelayedForComputeJobResult() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ startGrid(0);
+
+ Ignite cl1 = startClientGrid(1, (UnaryOperator<IgniteConfiguration>)cfg ->
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) {
+ if (msg instanceof TcpDiscoveryCustomEventMessage) {
+ try {
+ DiscoverySpiCustomMessage custom =
+ ((TcpDiscoveryCustomEventMessage)msg).message(marshaller(), U.gridClassLoader());
+
+ if (custom instanceof CustomMessageWrapper) {
+ DiscoveryCustomMessage delegate = ((CustomMessageWrapper)custom).delegate();
+
+ if (delegate instanceof MappingProposedMessage) {
+ MarshallerMappingItem item = GridTestUtils.getFieldValue(delegate, "mappingItem");
+
+ if (item.className().contains(JOB_RESULT_CLASS_NAME_PREFIX)) {
+ try {
+ U.await(latch, AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ fail("Exception must never be thrown: " + e.getMessage());
+ }
+ }
+ }
+ }
+ }
+ catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ super.startMessageProcess(msg);
+ }
+ }.setIpFinder(IP_FINDER)));
+
+ AtomicInteger results = new AtomicInteger(4);
+
+ // Flood system thread pool with task results.
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync((Callable<Boolean>)() -> {
+ int v;
+
+ while ((v = results.decrementAndGet()) >= 0) {
+ int v0 = v;
+
+ Object ignore = cl1.compute().call(() -> createResult(extClsLdr, v0));
+ }
+
+ return true;
+ }, LIMITED_SYSTEM_THREAD_POOL, "compute-thread");
+
+ latch.countDown();
+
+ fut.get(AWAIT_PROCESSING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * @param extClsLdr Class loader.
+ * @param key Cache key.
+ * @return Organization.
+ * @throws Exception If failed.
+ */
+ private static Object createOrganization(ClassLoader extClsLdr, int key) throws Exception {
+ Class<?> personCls = extClsLdr.loadClass(PERSON_CLASS_NAME);
+ Class<?> addrCls = extClsLdr.loadClass(ADDRESS_CLASS_NAME);
+
+ Constructor<?> personConstructor = personCls.getConstructor(String.class);
+ Constructor<?> addrConstructor = addrCls.getConstructor(String.class, Integer.TYPE);
+ Constructor<?> organizationConstructor = extClsLdr.loadClass(ORGANIZATION_CLASS_NAME)
+ .getConstructor(String.class, personCls, addrCls);
+
+ return organizationConstructor.newInstance("Organization " + key,
+ personConstructor.newInstance("Persone name " + key),
+ addrConstructor.newInstance("Street " + key, key));
+ }
+
+ /**
+ * @param extClsLdr Class loader.
+ * @param ver Class type version.
+ * @return Result.
+ * @throws Exception If fails.
+ */
+ public static Object createResult(ClassLoader extClsLdr, int ver) throws Exception {
+ Class<?> resCls = extClsLdr.loadClass(JOB_RESULT_CLASS_NAME_PREFIX + ver);
+
+ return resCls.getConstructor(int.class).newInstance(ver);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index cf53ea1..d61a62e 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1002,7 +1002,7 @@
* @throws Exception If anything failed.
*/
protected IgniteEx startClientGrid() throws Exception {
- return startClientGrid(getTestIgniteInstanceName());
+ return startClientGrid(getTestIgniteInstanceName(), (UnaryOperator<IgniteConfiguration>)null);
}
/**
@@ -1061,6 +1061,16 @@
}
/**
+ * @param idx Index of the grid to start.
+ * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+ * @return Started grid.
+ * @throws Exception If anything failed.
+ */
+ protected IgniteEx startClientGrid(int idx, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
+ return startClientGrid(getTestIgniteInstanceName(idx), cfgOp);
+ }
+
+ /**
* Starts new client grid with given index.
*
* @param idx Index of the grid to start.
@@ -1068,7 +1078,7 @@
* @throws Exception If anything failed.
*/
protected IgniteEx startClientGrid(int idx) throws Exception {
- return startClientGrid(getTestIgniteInstanceName(idx));
+ return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null);
}
/**
@@ -1083,7 +1093,7 @@
IgnitionEx.dependencyResolver(rslvr);
try {
- return startClientGrid(getTestIgniteInstanceName(idx));
+ return startClientGrid(getTestIgniteInstanceName(idx), (UnaryOperator<IgniteConfiguration>)null);
}
finally {
IgnitionEx.dependencyResolver(null);
@@ -1098,10 +1108,22 @@
* @throws Exception If anything failed.
*/
protected IgniteEx startClientGrid(String igniteInstanceName) throws Exception {
+ return startClientGrid(igniteInstanceName, (UnaryOperator<IgniteConfiguration>)null);
+ }
+
+ /**
+ * Starts new client grid with given name.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}.
+ * @return Started grid.
+ * @throws Exception If anything failed.
+ */
+ protected IgniteEx startClientGrid(String igniteInstanceName, UnaryOperator<IgniteConfiguration> cfgOp) throws Exception {
IgnitionEx.setClientMode(true);
try {
- return (IgniteEx)startGrid(igniteInstanceName, (GridSpringResourceContext)null);
+ return (IgniteEx)startGrid(igniteInstanceName, cfgOp, null);
}
finally {
IgnitionEx.setClientMode(false);
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
index e2419dc..8492bd3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite2.java
@@ -49,6 +49,7 @@
import org.apache.ignite.internal.processors.cache.GridLongRunningInitNewCrdFutureDiagnosticsTest;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClassNameConflictTest;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingOnMissTest;
+import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheClientRequestsMappingTest;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheFSRestoreTest;
import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheSeparateDirectoryTest;
import org.apache.ignite.internal.processors.cache.RebalanceWithDifferentThreadPoolSizeTest;
@@ -143,6 +144,7 @@
IgniteMarshallerCacheFSRestoreTest.class,
IgniteMarshallerCacheClassNameConflictTest.class,
IgniteMarshallerCacheClientRequestsMappingOnMissTest.class,
+ IgniteMarshallerCacheClientRequestsMappingTest.class,
IgniteMarshallerCacheSeparateDirectoryTest.class,
IgniteDiagnosticMessagesTest.class,
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java
new file mode 100644
index 0000000..cf680e7
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/Result.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class Result {
+ /** Param. */
+ final int v;
+
+ /**
+ * @param v Param.
+ */
+ Result(int v) {
+ this.v = v;
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java
new file mode 100644
index 0000000..1495d07
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV0.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV0 extends Result {
+ /**
+ * @param v Param.
+ */
+ public ResultV0(int v) {
+ super(v);
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java
new file mode 100644
index 0000000..d56a31e
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV1.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV1 extends Result {
+ /**
+ * @param v Param.
+ */
+ public ResultV1(int v) {
+ super(v);
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java
new file mode 100644
index 0000000..eb7e531
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV2.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV2 extends Result {
+ /**
+ * @param v Param.
+ */
+ public ResultV2(int v) {
+ super(v);
+ }
+}
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java
new file mode 100644
index 0000000..b0080d1
--- /dev/null
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ResultV3.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+/** */
+public class ResultV3 extends Result {
+ /**
+ * @param v Param.
+ */
+ public ResultV3(int v) {
+ super(v);
+ }
+}