IGNITE-12099 Avoid writing metadata to disk in discovery threads - Fixes #7396.

Signed-off-by: Alexey Goncharuk <alexey.goncharuk@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 76448a8..45e8b57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -188,7 +188,11 @@
 
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(CacheObjectValueContext ctx, ClassLoader ldr) throws IgniteCheckedException {
-        this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
+        CacheObjectBinaryProcessorImpl binaryProc = (CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects();
+
+        this.ctx = binaryProc.binaryContext();
+
+        binaryProc.waitMetadataWriteIfNeeded(typeId());
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
index bee4099..924cf14 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataFileStore.java
@@ -18,26 +18,35 @@
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureType;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Class handles saving/restoring binary metadata to/from disk.
  *
- * Current implementation needs to be rewritten as it issues IO operations from discovery thread
- * which may lead to segmentation of nodes from cluster.
+ * Current implementation needs to be rewritten as it issues IO operations from discovery thread which may lead to
+ * segmentation of nodes from cluster.
  */
 class BinaryMetadataFileStore {
     /** Link to resolved binary metadata directory. Null for non persistent mode */
@@ -50,16 +59,23 @@
     private final GridKernalContext ctx;
 
     /** */
+    private final boolean isPersistenceEnabled;
+
+    /** */
     private FileIOFactory fileIOFactory;
 
     /** */
     private final IgniteLogger log;
 
+    /** */
+    private BinaryMetadataAsyncWriter writer;
+
     /**
      * @param metadataLocCache Metadata locale cache.
      * @param ctx Context.
      * @param log Logger.
-     * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta and consistentId
+     * @param binaryMetadataFileStoreDir Path to binary metadata store configured by user, should include binary_meta
+     * and consistentId
      */
     BinaryMetadataFileStore(
         final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache,
@@ -69,6 +85,7 @@
     ) throws IgniteCheckedException {
         this.metadataLocCache = metadataLocCache;
         this.ctx = ctx;
+        this.isPersistenceEnabled = CU.isPersistenceEnabled(ctx.config());
         this.log = log;
 
         if (!CU.isPersistenceEnabled(ctx.config()))
@@ -90,13 +107,23 @@
         }
 
         U.ensureDirectory(workDir, "directory for serialized binary metadata", log);
+
+        writer = new BinaryMetadataAsyncWriter();
+        new IgniteThread(writer).start();
+    }
+
+    /**
+     * Stops worker for async writing of binary metadata.
+     */
+    void stop() {
+        U.cancel(writer);
     }
 
     /**
      * @param binMeta Binary metadata to be written to disk.
      */
     void writeMetadata(BinaryMetadata binMeta) {
-        if (!CU.isPersistenceEnabled(ctx.config()))
+        if (!isPersistenceEnabled)
             return;
 
         try {
@@ -118,6 +145,8 @@
 
             U.error(log, msg);
 
+            writer.cancel();
+
             ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
 
             throw new IgniteException(msg, e);
@@ -128,7 +157,7 @@
      * Restores metadata on startup of {@link CacheObjectBinaryProcessorImpl} but before starting discovery.
      */
     void restoreMetadata() {
-        if (!CU.isPersistenceEnabled(ctx.config()))
+        if (!isPersistenceEnabled)
             return;
 
         for (File file : workDir.listFiles()) {
@@ -145,9 +174,8 @@
     }
 
     /**
-     * Checks if binary metadata for the same typeId is already presented on disk.
-     * If so merges it with new metadata and stores the result.
-     * Otherwise just writes new metadata.
+     * Checks if binary metadata for the same typeId is already presented on disk. If so merges it with new metadata and
+     * stores the result. Otherwise just writes new metadata.
      *
      * @param binMeta new binary metadata to write to disk.
      */
@@ -158,7 +186,8 @@
             BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(existingMeta, binMeta);
 
             writeMetadata(mergedMeta);
-        } else
+        }
+        else
             writeMetadata(binMeta);
     }
 
@@ -183,4 +212,317 @@
 
         return null;
     }
+
+    /**
+     *
+     */
+    void prepareMetadataWriting(BinaryMetadata meta, int typeVer) {
+        if (!isPersistenceEnabled)
+            return;
+
+        writer.prepareWriteFuture(meta, typeVer);
+    }
+
+    /**
+     * @param typeId Type ID.
+     * @param typeVer Type version.
+     */
+    void writeMetadataAsync(int typeId, int typeVer) {
+        if (!isPersistenceEnabled)
+            return;
+
+        writer.startWritingAsync(typeId, typeVer);
+    }
+
+    /**
+     * {@code typeVer} parameter is always non-negative except one special case
+     * (see {@link CacheObjectBinaryProcessorImpl#addMeta(int, BinaryType, boolean)} for context):
+     * if request for bin meta update arrives right at the moment when node is stopping
+     * {@link MetadataUpdateResult} of special type is generated: UPDATE_DISABLED.
+     *
+     * @param typeId
+     * @param typeVer
+     * @throws IgniteCheckedException
+     */
+    void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
+        if (!isPersistenceEnabled)
+            return;
+
+        writer.waitForWriteCompletion(typeId, typeVer);
+    }
+
+    /**
+     * @param typeId Binary metadata type id.
+     * @param typeVer Type version.
+     */
+    void finishWrite(int typeId, int typeVer) {
+        if (!isPersistenceEnabled)
+            return;
+
+        writer.finishWriteFuture(typeId, typeVer);
+    }
+
+    /**
+     *
+     */
+    private class BinaryMetadataAsyncWriter extends GridWorker {
+        /**
+         * Queue of write tasks submitted for execution.
+         */
+        private final BlockingQueue<WriteOperationTask> queue = new LinkedBlockingQueue<>();
+
+        /**
+         * Write operation tasks prepared for writing (but not yet submitted to execution (actual writing).
+         */
+        private final ConcurrentMap<OperationSyncKey, WriteOperationTask> preparedWriteTasks = new ConcurrentHashMap<>();
+
+        /** */
+        BinaryMetadataAsyncWriter() {
+            super(ctx.igniteInstanceName(), "binary-metadata-writer", BinaryMetadataFileStore.this.log, ctx.workersRegistry());
+        }
+
+        /**
+         * @param typeId Type ID.
+         * @param typeVer Type version.
+         */
+        synchronized void startWritingAsync(int typeId, int typeVer) {
+            if (isCancelled())
+                return;
+
+            WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
+
+            if (task != null) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Submitting task for async write for" +
+                            " [typeId=" + typeId +
+                            ", typeVersion=" + typeVer + ']'
+                    );
+
+                queue.add(task);
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Task for async write for" +
+                            " [typeId=" + typeId +
+                            ", typeVersion=" + typeVer + "] not found"
+                    );
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void cancel() {
+            super.cancel();
+
+            queue.clear();
+
+            IgniteCheckedException err = new IgniteCheckedException("Operation has been cancelled (node is stopping).");
+
+            for (Map.Entry<OperationSyncKey, WriteOperationTask> e : preparedWriteTasks.entrySet()) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Cancelling future for write operation for" +
+                            " [typeId=" + e.getKey().typeId +
+                            ", typeVer=" + e.getKey().typeVer + ']'
+                    );
+
+                e.getValue().future.onDone(err);
+            }
+
+            preparedWriteTasks.clear();
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                try {
+                    body0();
+                }
+                catch (InterruptedException e) {
+                    if (!isCancelled) {
+                        ctx.failure().process(new FailureContext(FailureType.SYSTEM_WORKER_TERMINATION, e));
+
+                        throw e;
+                    }
+                }
+            }
+        }
+
+        /** */
+        private void body0() throws InterruptedException {
+            WriteOperationTask task;
+
+            blockingSectionBegin();
+
+            try {
+                task = queue.take();
+
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Starting write operation for" +
+                            " [typeId=" + task.meta.typeId() +
+                            ", typeVer=" + task.typeVer + ']'
+                    );
+
+                writeMetadata(task.meta);
+            }
+            finally {
+                blockingSectionEnd();
+            }
+
+            finishWriteFuture(task.meta.typeId(), task.typeVer);
+        }
+
+        /**
+         * @param typeId Binary metadata type id.
+         * @param typeVer Type version.
+         */
+        void finishWriteFuture(int typeId, int typeVer) {
+            WriteOperationTask task = preparedWriteTasks.remove(new OperationSyncKey(typeId, typeVer));
+
+            if (task != null) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Future for write operation for" +
+                            " [typeId=" + typeId +
+                            ", typeVer=" + typeVer + ']' +
+                            " completed."
+                    );
+
+                task.future.onDone();
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Future for write operation for" +
+                            " [typeId=" + typeId +
+                            ", typeVer=" + typeVer + ']' +
+                            " not found."
+                    );
+            }
+        }
+
+        /**
+         * @param meta Binary metadata.
+         * @param typeVer Type version.
+         */
+        synchronized void prepareWriteFuture(BinaryMetadata meta, int typeVer) {
+            if (isCancelled())
+                return;
+
+            if (log.isDebugEnabled())
+                log.debug(
+                    "Prepare task for async write for" +
+                        "[typeName=" + meta.typeName() +
+                        ", typeId=" + meta.typeId() +
+                        ", typeVersion=" + typeVer + ']'
+                );
+
+            preparedWriteTasks.putIfAbsent(new OperationSyncKey(meta.typeId(), typeVer), new WriteOperationTask(meta, typeVer));
+        }
+
+        /**
+         * @param typeId Type ID.
+         * @param typeVer Type version.
+         * @throws IgniteCheckedException If write operation failed.
+         */
+        void waitForWriteCompletion(int typeId, int typeVer) throws IgniteCheckedException {
+            //special case, see javadoc of {@link BinaryMetadataFileStore#waitForWriteCompletion}
+            if (typeVer == -1) {
+                if (log.isDebugEnabled())
+                    log.debug("No need to wait for " + typeId + ", negative typeVer was passed.");
+
+                return;
+            }
+
+            WriteOperationTask task = preparedWriteTasks.get(new OperationSyncKey(typeId, typeVer));
+
+            if (task != null) {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Waiting for write completion of" +
+                            " [typeId=" + typeId +
+                            ", typeVer=" + typeVer + "]"
+                    );
+
+                try {
+                    task.future.get();
+                }
+                finally {
+                    if (log.isDebugEnabled())
+                        log.debug(
+                            "Released for write completion of" +
+                                " [typeId=" + typeId +
+                                ", typeVer=" + typeVer + ']'
+                        );
+                }
+            }
+            else {
+                if (log.isDebugEnabled())
+                    log.debug(
+                        "Task for async write for" +
+                            " [typeId=" + typeId +
+                            ", typeVersion=" + typeVer + "] not found"
+                    );
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class WriteOperationTask {
+        /** */
+        private final BinaryMetadata meta;
+
+        /** */
+        private final int typeVer;
+
+        /** */
+        private final GridFutureAdapter future = new GridFutureAdapter();
+
+        /** */
+        private WriteOperationTask(BinaryMetadata meta, int ver) {
+            this.meta = meta;
+            typeVer = ver;
+        }
+    }
+
+    /**
+     *
+     */
+    private static final class OperationSyncKey {
+        /** */
+        private final int typeId;
+
+        /** */
+        private final int typeVer;
+
+        /** */
+        private OperationSyncKey(int typeId, int typeVer) {
+            this.typeId = typeId;
+            this.typeVer = typeVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return 31 * typeId + typeVer;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object obj) {
+            if (!(obj instanceof OperationSyncKey))
+                return false;
+
+            OperationSyncKey that = (OperationSyncKey)obj;
+
+            return (that.typeId == typeId) && (that.typeVer == typeVer);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(OperationSyncKey.class, this);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 3a95586..7b89632 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -142,7 +142,7 @@
         if (clientNode)
             ctx.event().addLocalEventListener(new GridLocalEventListener() {
                 @Override public void onEvent(Event evt) {
-                    DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+                    DiscoveryEvent evt0 = (DiscoveryEvent)evt;
 
                     if (!ctx.isStopping()) {
                         for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
@@ -198,13 +198,13 @@
             .map(BinaryMetadataHolder::metadata)
             .orElse(null);
 
-        Set<Integer> changedSchemas  = new LinkedHashSet<>();
+        Set<Integer> changedSchemas = new LinkedHashSet<>();
 
         //Ensure after putting pending future, metadata still has difference.
         BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas);
 
         if (mergedMeta == oldMeta) {
-            resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+            resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
 
             return null;
         }
@@ -282,13 +282,14 @@
         BinaryMetadataHolder holder = metaLocCache.get(typeId);
 
         if (holder.acceptedVersion() >= ver)
-            resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+            resFut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
 
         return resFut;
     }
 
     /**
      * Await specific schema update.
+     *
      * @param typeId Type id.
      * @param schemaId Schema id.
      * @return Future which will be completed when schema is received.
@@ -303,8 +304,8 @@
     }
 
     /**
-     * Allows client node to request latest version of binary metadata for a given typeId from the cluster
-     * in case client is able to detect that it has obsolete metadata in its local cache.
+     * Allows client node to request latest version of binary metadata for a given typeId from the cluster in case
+     * client is able to detect that it has obsolete metadata in its local cache.
      *
      * @param typeId ID of binary type.
      * @return future to wait for request arrival on.
@@ -354,7 +355,8 @@
     private final class MetadataUpdateProposedListener implements CustomEventListener<MetadataUpdateProposedMessage> {
 
         /** {@inheritDoc} */
-        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+            MetadataUpdateProposedMessage msg) {
             if (log.isDebugEnabled())
                 log.debug("Received MetadataUpdateProposedListener [typeId=" + msg.typeId() +
                     ", typeName=" + msg.metadata().typeName() +
@@ -392,10 +394,10 @@
 
                     if (log.isDebugEnabled())
                         log.debug("Versions are stamped on coordinator" +
-                                " [typeId=" + typeId +
-                                ", changedSchemas=" + changedSchemas +
-                                ", pendingVer=" + pendingVer +
-                                ", acceptedVer=" + acceptedVer + "]"
+                            " [typeId=" + typeId +
+                            ", changedSchemas=" + changedSchemas +
+                            ", pendingVer=" + pendingVer +
+                            ", acceptedVer=" + acceptedVer + "]"
                         );
 
                     msg.metadata(mergedMeta);
@@ -429,13 +431,13 @@
                                 holder = metaLocCache.get(typeId);
 
                                 if (obsoleteUpdate(
-                                        holder.pendingVersion(),
-                                        holder.acceptedVersion(),
-                                        pendingVer,
-                                        acceptedVer)) {
+                                    holder.pendingVersion(),
+                                    holder.acceptedVersion(),
+                                    pendingVer,
+                                    acceptedVer)) {
                                     obsoleteUpd = true;
 
-                                    fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+                                    fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
 
                                     break;
                                 }
@@ -457,6 +459,8 @@
                             log.debug("Updated metadata on originating node: " + newHolder);
 
                         metaLocCache.put(typeId, newHolder);
+
+                        metadataFileStore.prepareMetadataWriting(msg.metadata(), pendingVer);
                     }
                 }
             }
@@ -479,13 +483,14 @@
                                     holder = metaLocCache.get(typeId);
 
                                     if (obsoleteUpdate(
-                                            holder.pendingVersion(),
-                                            holder.acceptedVersion(),
-                                            pendingVer,
-                                            acceptedVer))
+                                        holder.pendingVersion(),
+                                        holder.acceptedVersion(),
+                                        pendingVer,
+                                        acceptedVer))
                                         break;
 
-                                } while (!metaLocCache.replace(typeId, holder, newHolder));
+                                }
+                                while (!metaLocCache.replace(typeId, holder, newHolder));
                             }
                         }
                         else {
@@ -494,6 +499,8 @@
                                     ", changedSchemas=" + changedSchemas + ']');
 
                             metaLocCache.put(typeId, newHolder);
+
+                            metadataFileStore.prepareMetadataWriting(mergedMeta, pendingVer);
                         }
                     }
                     catch (BinaryObjectException ignored) {
@@ -537,7 +544,8 @@
     private final class MetadataUpdateAcceptedListener implements CustomEventListener<MetadataUpdateAcceptedMessage> {
 
         /** {@inheritDoc} */
-        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateAcceptedMessage msg) {
+        @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd,
+            MetadataUpdateAcceptedMessage msg) {
             if (log.isDebugEnabled())
                 log.debug("Received MetadataUpdateAcceptedMessage " + msg);
 
@@ -554,7 +562,7 @@
 
             if (clientNode) {
                 BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
-                        holder.pendingVersion(), newAcceptedVer);
+                    holder.pendingVersion(), newAcceptedVer);
 
                 do {
                     holder = metaLocCache.get(typeId);
@@ -577,10 +585,12 @@
                     //this is duplicate ack
                     msg.duplicated(true);
 
+                    metadataFileStore.finishWrite(typeId, newAcceptedVer);
+
                     return;
                 }
 
-                metadataFileStore.writeMetadata(holder.metadata());
+                metadataFileStore.writeMetadataAsync(typeId, newAcceptedVer);
 
                 metaLocCache.put(typeId, new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
             }
@@ -612,13 +622,13 @@
             }
 
             if (fut != null)
-                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult(newAcceptedVer));
         }
     }
 
     /**
-     * Future class responsible for blocking threads until particular events with metadata updates happen,
-     * e.g. arriving {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
+     * Future class responsible for blocking threads until particular events with metadata updates happen, e.g. arriving
+     * {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
      */
     public final class MetadataUpdateResultFuture extends GridFutureAdapter<MetadataUpdateResult> {
         /** */
@@ -657,6 +667,11 @@
             this.key = key;
         }
 
+        /** */
+        public int typeVersion() {
+            return key.ver;
+        }
+
         /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(MetadataUpdateResultFuture.class, this);
@@ -664,8 +679,8 @@
     }
 
     /**
-     * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages
-     * to {@link MetadataUpdateResultFuture}s other threads may be waiting on.
+     * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages to {@link MetadataUpdateResultFuture}s
+     * other threads may be waiting on.
      */
     private static final class SyncKey {
         /** */
@@ -710,7 +725,7 @@
             if (!(o instanceof SyncKey))
                 return false;
 
-            SyncKey that = (SyncKey) o;
+            SyncKey that = (SyncKey)o;
 
             return (typeId == that.typeId) && (ver == that.ver);
         }
@@ -739,7 +754,7 @@
         @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataRequestMessage : msg;
 
-            MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
+            MetadataRequestMessage msg0 = (MetadataRequestMessage)msg;
 
             int typeId = msg0.typeId();
 
@@ -783,7 +798,7 @@
         @Override public void onMessage(UUID nodeId, Object msg, byte plc) {
             assert msg instanceof MetadataResponseMessage : msg;
 
-            MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
+            MetadataResponseMessage msg0 = (MetadataResponseMessage)msg;
 
             int typeId = msg0.typeId();
 
@@ -795,7 +810,7 @@
                 return;
 
             if (msg0.metadataNotFound()) {
-                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
 
                 return;
             }
@@ -811,16 +826,16 @@
 
                         // typeId metadata cannot be removed after initialization.
                         if (obsoleteUpdate(
-                                oldHolder.pendingVersion(),
-                                oldHolder.acceptedVersion(),
-                                newHolder.pendingVersion(),
-                                newHolder.acceptedVersion()))
+                            oldHolder.pendingVersion(),
+                            oldHolder.acceptedVersion(),
+                            newHolder.pendingVersion(),
+                            newHolder.acceptedVersion()))
                             break;
                     }
                     while (!metaLocCache.replace(typeId, oldHolder, newHolder));
                 }
 
-                fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+                fut.onDone(MetadataUpdateResult.createSuccessfulResult(-1));
             }
             catch (IgniteCheckedException e) {
                 fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e)));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index ac2d237..b5dabcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -324,6 +324,9 @@
     @Override public void stop(boolean cancel) {
         if (transport != null)
             transport.stop();
+
+        if (metadataFileStore != null)
+            metadataFileStore.stop();
     }
 
     /** {@inheritDoc} */
@@ -575,6 +578,8 @@
 
             if (res.rejected())
                 throw res.error();
+            else if (!ctx.clientNode())
+                metadataFileStore.waitForWriteCompletion(typeId, res.typeVersion());
         }
         catch (IgniteCheckedException e) {
             IgniteCheckedException ex = e;
@@ -649,6 +654,30 @@
     }
 
     /**
+     * Forces caller thread to wait for binary metadata write operation for given type ID.
+     *
+     * In case of in-memory mode this method becomes a No-op as no binary metadata is written to disk in this mode.
+     *
+     * @param typeId ID of binary type to wait for metadata write operation.
+     */
+    public void waitMetadataWriteIfNeeded(final int typeId) {
+        if (metadataFileStore == null)
+            return;
+
+        BinaryMetadataHolder hldr = metadataLocCache.get(typeId);
+
+        if (hldr != null) {
+            try {
+                metadataFileStore.waitForWriteCompletion(typeId, hldr.pendingVersion());
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+                    ", typeVer=" + hldr.acceptedVersion() + ']', e);
+            }
+        }
+    }
+
+    /**
      * @param typeId Type ID.
      * @return Metadata.
      * @throws IgniteException In case of error.
@@ -691,6 +720,17 @@
                     // No-op.
                 }
             }
+            else if (metadataFileStore != null) {
+                try {
+                    metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
+                }
+                catch (IgniteCheckedException e) {
+                    log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+                        ", typeVer=" + holder.acceptedVersion() + ']', e);
+
+                    return null;
+                }
+            }
 
             return holder.metadata();
         }
@@ -805,6 +845,18 @@
             }
         }
 
+        if (holder != null && metadataFileStore != null) {
+            try {
+                metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
+            }
+            catch (IgniteCheckedException e) {
+                log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
+                    ", typeVer=" + holder.acceptedVersion() + ']', e);
+
+                return null;
+            }
+        }
+
         return holder != null ? holder.metadata().wrap(binaryCtx) : null;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
index 6c299ab..f6d0d9d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
@@ -28,13 +28,18 @@
     /** */
     private final BinaryObjectException error;
 
+    /** */
+    private final int typeVer;
+
     /**
      * @param resType Response type.
      * @param error Error.
+     * @param typeVer Accepted version of updated type.
      */
-    private MetadataUpdateResult(ResultType resType, BinaryObjectException error) {
+    private MetadataUpdateResult(ResultType resType, BinaryObjectException error, int typeVer) {
         this.resType = resType;
         this.error = error;
+        this.typeVer = typeVer;
     }
 
     /**
@@ -51,11 +56,16 @@
         return error;
     }
 
+    /** */
+    int typeVersion() {
+        return typeVer;
+    }
+
     /**
-     *
+     * @param typeVer Accepted version of updated BinaryMetadata type or <code>-1</code> if not applicable.
      */
-    static MetadataUpdateResult createSuccessfulResult() {
-        return new MetadataUpdateResult(ResultType.SUCCESS, null);
+    static MetadataUpdateResult createSuccessfulResult(int typeVer) {
+        return new MetadataUpdateResult(ResultType.SUCCESS, null, typeVer);
     }
 
     /**
@@ -64,14 +74,14 @@
     static MetadataUpdateResult createFailureResult(BinaryObjectException err) {
         assert err != null;
 
-        return new MetadataUpdateResult(ResultType.REJECT, err);
+        return new MetadataUpdateResult(ResultType.REJECT, err, -1);
     }
 
     /**
      *
      */
     static MetadataUpdateResult createUpdateDisabledResult() {
-        return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null);
+        return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null, -1);
     }
 
     /**
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index db821f1..6abb02e 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -123,6 +123,7 @@
 org.apache.ignite.client.SslMode
 org.apache.ignite.client.SslProtocol
 org.apache.ignite.cluster.ClusterGroupEmptyException
+org.apache.ignite.cluster.ClusterState
 org.apache.ignite.cluster.ClusterTopologyException
 org.apache.ignite.compute.ComputeExecutionRejectedException
 org.apache.ignite.compute.ComputeJob
@@ -1313,6 +1314,7 @@
 org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$BaselineStateAndHistoryData
 org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$CheckGlobalStateComputeRequest
 org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$ClientChangeGlobalStateComputeRequest
+org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor$ClientSetGlobalStateComputeRequest
 org.apache.ignite.internal.processors.cluster.baseline.autoadjust.BaselineAutoAdjustStatus$TaskState
 org.apache.ignite.internal.processors.configuration.distributed.DetachedPropertyException
 org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationProcessor$AllowableAction
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
new file mode 100644
index 0000000..219f9ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsBinaryMetadataAsyncWritingTest.java
@@ -0,0 +1,754 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.OpenOption;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.testframework.GridTestUtils.suppressException;
+
+/**
+ * Tests for verification of binary metadata async writing to disk.
+ */
+public class IgnitePdsBinaryMetadataAsyncWritingTest extends GridCommonAbstractTest {
+    /** */
+    private static final AtomicReference<CountDownLatch> fileWriteLatchRef = new AtomicReference<>(null);
+
+    /** */
+    private FileIOFactory specialFileIOFactory;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client")) {
+            cfg.setClientMode(true);
+
+            return cfg;
+        }
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+                .setFileIOFactory(
+                    specialFileIOFactory != null ? specialFileIOFactory : new RandomAccessFileIOFactory()
+                )
+        );
+
+        cfg.setCacheConfiguration(
+            new CacheConfiguration(DEFAULT_CACHE_NAME)
+                .setBackups(1)
+            .setAffinity(new RendezvousAffinityFunction(false, 16))
+        );
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        if (fileWriteLatchRef != null && fileWriteLatchRef.get() != null)
+            fileWriteLatchRef.get().countDown();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Verifies that registration of new binary meta does not block discovery thread
+     * and new node can join the cluster when binary metadata is in the process of writing.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNodeJoinIsNotBlockedByAsyncMetaWriting() throws Exception {
+        final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+        Ignite ig = startGrid(0);
+        ig.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+        GridTestUtils.runAsync(() -> cache.put(0, new TestAddress(0, "USA", "NYC", "Park Ave")));
+
+        specialFileIOFactory = null;
+
+        startGrid(1);
+        waitForTopology(2);
+
+        fileWriteLatch.countDown();
+    }
+
+    /**
+     * Verifies that metadata is restored on node join even if it was deleted when the node was down.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testBinaryMetadataIsRestoredAfterDeletionOnNodeJoin() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+        IgniteEx ig1 = startGrid(1);
+
+        ig0.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
+        int key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+        cache.put(key, new TestAddress(0, "USA", "NYC", "Park Ave"));
+
+        String ig1ConsId = ig1.localNode().consistentId().toString();
+        stopGrid(1);
+        cleanBinaryMetaFolderForNode(ig1ConsId);
+
+        ig1 = startGrid(1);
+        stopGrid(0);
+
+        cache = ig1.cache(DEFAULT_CACHE_NAME);
+        TestAddress addr = (TestAddress)cache.get(key);
+        assertNotNull(addr);
+        assertEquals("USA", addr.country);
+    }
+
+    /**
+     * Verifies that request adding/modifying binary metadata (e.g. put to cache a new value)
+     * is blocked until write to disk is finished.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testThreadRequestingUpdateBlockedTillWriteCompletion() throws Exception {
+        final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+        Ignite ig = startGrid();
+
+        ig.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+        GridTestUtils.runAsync(() -> cache.put(1, new TestPerson(0, "John", "Oliver")));
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        fileWriteLatch.countDown();
+
+        assertTrue(GridTestUtils.waitForCondition(() -> cache.size(CachePeekMode.PRIMARY) == 1, 10_000));
+    }
+
+    /**
+     * @throws Exception
+     */
+    @Test
+    public void testDiscoveryIsNotBlockedOnMetadataWrite() throws Exception {
+        final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+
+        IgniteKernal ig = (IgniteKernal)startGrid();
+
+        ig.cluster().active(true);
+
+        IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+        TestAddress addr = new TestAddress(0, "RUS", "Spb", "Nevsky");
+        TestPerson person = new TestPerson(0, "John", "Oliver");
+        person.address(addr);
+        TestAccount account = new TestAccount(person, 0, 1000);
+
+        GridTestUtils.runAsync(() -> cache.put(0, addr));
+        GridTestUtils.runAsync(() -> cache.put(0, person));
+        GridTestUtils.runAsync(() -> cache.put(0, account));
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map locCache = GridTestUtils.getFieldValue(
+            (CacheObjectBinaryProcessorImpl)ig.context().cacheObjects(), "metadataLocCache");
+
+        assertTrue(GridTestUtils.waitForCondition(() -> locCache.size() == 3, 5_000));
+
+        fileWriteLatch.countDown();
+    }
+
+    /**
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNodeIsStoppedOnExceptionDuringStoringMetadata() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+
+        specialFileIOFactory = new FailingFileIOFactory(new RandomAccessFileIOFactory());
+
+        IgniteEx ig1 = startGrid(1);
+
+        ig0.cluster().active(true);
+
+        int ig1Key = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+
+        IgniteCache<Object, Object> cache = ig0.cache(DEFAULT_CACHE_NAME);
+
+        cache.put(ig1Key, new TestAddress(0, "USA", "NYC", "6th Ave"));
+
+        waitForTopology(1);
+    }
+
+    /**
+     * Verifies that no updates are applied to cache on node until all metadata write operations
+     * for updated type are fully written to disk.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testParallelUpdatesToBinaryMetadata() throws Exception {
+        IgniteEx ig0 = startGrid(0);
+
+        final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+        IgniteEx ig1 = startGrid(1);
+
+        specialFileIOFactory = null;
+        IgniteEx ig2 = startGrid(2);
+
+        ig0.cluster().active(true);
+
+        int key0 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode());
+        int key1 = findAffinityKeyForNode(ig0.affinity(DEFAULT_CACHE_NAME), ig1.localNode(), key0);
+
+        assertTrue(key0 != key1);
+
+        GridTestUtils.runAsync(() -> ig0.cache(DEFAULT_CACHE_NAME).put(key0, new TestAddress(key0, "Russia", "Moscow")));
+        GridTestUtils.runAsync(() -> ig2.cache(DEFAULT_CACHE_NAME).put(key1, new TestAddress(key1, "USA", "NYC", "Park Ave")));
+
+        assertEquals(0, ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY));
+
+        fileWriteLatch.countDown();
+
+        assertTrue(GridTestUtils.
+            waitForCondition(() -> ig0.cache(DEFAULT_CACHE_NAME).size(CachePeekMode.PRIMARY) == 2, 10_000));
+
+        stopGrid(0);
+        stopGrid(2);
+
+        IgniteCache<Object, Object> cache = ig1.cache(DEFAULT_CACHE_NAME);
+        TestAddress addr0 = (TestAddress)cache.get(key0);
+        TestAddress addr1 = (TestAddress)cache.get(key1);
+
+        assertEquals("Russia", addr0.country);
+        assertEquals("USA", addr1.country);
+    }
+
+    /**
+     * Verifies that put(key) method called from client on cache in FULL_SYNC mode returns only when
+     * all affinity nodes for this key finished writing binary metadata.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRequestFromClientIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
+        String cacheName = "testCache";
+
+        CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteEx ig0 = startGrid(0);
+        IgniteEx cl0 = startGrid("client0");
+
+        CountDownLatch fileWriteLatch = new CountDownLatch(1);
+        IgniteEx ig1 = startGrid(1);
+
+        ig1.context().discovery().setCustomEventListener(
+            MetadataUpdateAcceptedMessage.class,
+            (topVer, snd, msg) -> suppressException(fileWriteLatch::await)
+        );
+
+        ListeningTestLogger listeningLog = new ListeningTestLogger(true, log);
+        LogListener waitingForWriteLsnr = LogListener.matches("Waiting for write completion of").build();
+        listeningLog.registerListener(waitingForWriteLsnr);
+
+        startGrid(2);
+
+        listeningLog = null;
+
+        ig0.cluster().active(true);
+        IgniteCache cache0 = cl0.createCache(testCacheCfg);
+
+        int key0 = findAffinityKeyForNode(ig0.affinity(cacheName), ig0.localNode());
+
+        AtomicBoolean putFinished = new AtomicBoolean(false);
+
+        GridTestUtils.runAsync(() -> {
+            cache0.put(key0, new TestAddress(key0, "Russia", "Saint-Petersburg"));
+
+            putFinished.set(true);
+        });
+
+        assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+
+        fileWriteLatch.countDown();
+
+        assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+    }
+
+    /**
+     * Verifies that put(key) method called from non-affinity server on cache in FULL_SYNC mode returns only when
+     * all affinity nodes for this key finished writing binary metadata.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRequestFromServerIsBlockedIfBinaryMetaWriteIsHanging() throws Exception {
+        putRequestFromServer(true);
+    }
+
+    /**
+     * Verifies that put from client to ATOMIC cache in PRIMARY_SYNC mode is not blocked
+     * if binary metadata async write operation hangs on backup node and not on primary.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRequestFromClientCompletesIfMetadataWriteHangOnBackup() throws Exception {
+        String cacheName = "testCache";
+
+        CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setWriteSynchronizationMode(PRIMARY_SYNC);
+
+        IgniteEx ig0 = startGrid(0);
+
+        CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+        IgniteEx ig1 = startGrid(1);
+
+        specialFileIOFactory = null;
+        IgniteEx ig2 = startGrid(2);
+
+        ig0.cluster().active(true);
+
+        IgniteEx cl0 = startGrid("client0");
+
+        IgniteCache cache = cl0.createCache(testCacheCfg);
+        Affinity<Object> aff = cl0.affinity(cacheName);
+
+        AtomicBoolean putCompleted = new AtomicBoolean(false);
+        int key = findAffinityKeyForNode(aff, ig0.localNode());
+        GridTestUtils.runAsync(() -> {
+            cache.put(key, new TestAddress(key, "USA", "NYC"));
+
+            putCompleted.set(true);
+        });
+
+        assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
+
+        fileWriteLatch.countDown();
+    }
+
+    /**
+     * Verifies that put from server to ATOMIC cache in PRIMARY_SYNC mode is not blocked
+     * if binary metadata async write operation hangs on backup node and not on primary.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRequestFromServerCompletesIfMetadataWriteHangOnBackup() throws Exception {
+        putRequestFromServer(false);
+    }
+
+    /**
+     * Verifies that metadata write hanging on non-affinity node doesn't block on-going put operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPutRequestFromClientCompletesIfMetadataWriteHangOnNonAffinityNode() throws Exception {
+        String cacheName = "testCache";
+
+        CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+            .setBackups(1)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setWriteSynchronizationMode(FULL_SYNC);
+
+        IgniteEx ig0 = startGrid(0);
+
+        CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+        IgniteEx ig1 = startGrid(1);
+
+        specialFileIOFactory = null;
+        IgniteEx ig2 = startGrid(2);
+
+        IgniteEx cl0 = startGrid("client0");
+        cl0.cluster().state(ClusterState.ACTIVE);
+
+        IgniteCache cache = cl0.createCache(testCacheCfg);
+        Affinity<Object> aff = cl0.affinity(cacheName);
+        int nonAffKey = findNonAffinityKeyForNode(aff, ig1.localNode(), 0);
+
+        AtomicBoolean putCompleted = new AtomicBoolean(false);
+
+        GridTestUtils.runAsync(() -> {
+            cache.put(nonAffKey, new TestAddress(nonAffKey, "USA", "NYC"));
+
+            putCompleted.set(true);
+        });
+
+        assertTrue(GridTestUtils.waitForCondition(() -> putCompleted.get(), 5_000));
+
+        //internal map in BinaryMetadataFileStore with futures awaiting write operations
+        Map map = GridTestUtils.getFieldValue(
+            (CacheObjectBinaryProcessorImpl)ig1.context().cacheObjects(),  "metadataFileStore", "writer", "preparedWriteTasks");
+
+        assertTrue(!map.isEmpty());
+
+        fileWriteLatch.countDown();
+
+        assertTrue(GridTestUtils.waitForCondition(() -> map.isEmpty(), 5_000));
+    }
+
+    /**
+     * @param expectedBlocked
+     * @throws Exception
+     */
+    private void putRequestFromServer(boolean expectedBlocked) throws Exception {
+        String cacheName = "testCache";
+
+        CacheWriteSynchronizationMode syncMode = expectedBlocked ? FULL_SYNC : PRIMARY_SYNC;
+
+        CacheConfiguration testCacheCfg = new CacheConfiguration(cacheName)
+            .setBackups(2)
+            .setAtomicityMode(CacheAtomicityMode.ATOMIC)
+            .setCacheMode(CacheMode.PARTITIONED)
+            .setWriteSynchronizationMode(syncMode);
+
+        IgniteEx ig0 = startGrid(0);
+
+        startGrid(1);
+        final CountDownLatch fileWriteLatch = initSlowFileIOFactory();
+        IgniteEx ig2 = startGrid(2);
+
+        specialFileIOFactory = null;
+        startGrid(3);
+
+        ig0.cluster().active(true);
+        IgniteCache cache = ig0.createCache(testCacheCfg);
+
+        int key = 0;
+        Affinity<Object> aff = ig0.affinity(cacheName);
+
+        while (true) {
+            key = findNonAffinityKeyForNode(aff, ig0.localNode(), key);
+
+            if (aff.isBackup(ig2.localNode(), key))
+                break;
+            else
+                key++;
+        }
+
+        AtomicBoolean putFinished = new AtomicBoolean(false);
+
+        int key0 = key;
+        GridTestUtils.runAsync(() -> {
+            cache.put(key0, new TestAddress(key0, "USA", "NYC"));
+
+            putFinished.set(true);
+        });
+
+        if (expectedBlocked) {
+            assertFalse(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+
+            fileWriteLatch.countDown();
+
+            assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+        }
+        else
+            assertTrue(GridTestUtils.waitForCondition(() -> putFinished.get(), 5_000));
+    }
+
+    /**
+     * Initializes special FileIOFactory emulating slow write to disk.
+     *
+     * @return Latch to release write operation.
+     */
+    private CountDownLatch initSlowFileIOFactory() {
+        CountDownLatch cdl = new CountDownLatch(1);
+
+        specialFileIOFactory = new SlowFileIOFactory(new RandomAccessFileIOFactory());
+        fileWriteLatchRef.set(cdl);
+
+        return cdl;
+    }
+
+    /**
+     * Deletes directory with persisted binary metadata for a node with given Consistent ID.
+     */
+    private void cleanBinaryMetaFolderForNode(String consId) throws IgniteCheckedException {
+        String dfltWorkDir = U.defaultWorkDirectory();
+        File metaDir = U.resolveWorkDirectory(dfltWorkDir, "binary_meta", false);
+
+        for (File subDir : metaDir.listFiles()) {
+            if (subDir.getName().contains(consId)) {
+                U.delete(subDir);
+
+                return;
+            }
+        }
+    }
+
+    /** Finds a key that target node is neither primary or backup. */
+    private int findNonAffinityKeyForNode(Affinity aff, ClusterNode targetNode, int startFrom) {
+        int key = startFrom;
+
+        while (true) {
+            if (!aff.isPrimaryOrBackup(targetNode, key))
+                return key;
+
+            key++;
+        }
+    }
+
+    /** Finds a key that target node is a primary node for. */
+    private int findAffinityKeyForNode(Affinity aff, ClusterNode targetNode, Integer... excludeKeys) {
+        int key = 0;
+
+        while (true) {
+            if (aff.isPrimary(targetNode, key)
+                && (excludeKeys != null ? !Arrays.asList(excludeKeys).contains(Integer.valueOf(key)) : true))
+                return key;
+
+            key++;
+        }
+    }
+
+    /** */
+    static final class TestPerson {
+        /** */
+        private final int id;
+
+        /** */
+        private final String firstName;
+
+        /** */
+        private final String surname;
+
+        /** */
+        private TestAddress addr;
+
+        /** */
+        TestPerson(int id, String firstName, String surname) {
+            this.id = id;
+            this.firstName = firstName;
+            this.surname = surname;
+        }
+
+        /** */
+        void address(TestAddress addr) {
+            this.addr = addr;
+        }
+    }
+
+    /** */
+    static final class TestAddress {
+        /** */
+        private final int id;
+
+        /** */
+        private final String country;
+
+        /** */
+        private final String city;
+
+        /** */
+        private final String address;
+
+        /** */
+        TestAddress(int id, String country, String city) {
+            this.id = id;
+            this.country = country;
+            this.city = city;
+            this.address = null;
+        }
+
+        /** */
+        TestAddress(int id, String country, String city, String street) {
+            this.id = id;
+            this.country = country;
+            this.city = city;
+            this.address = street;
+        }
+    }
+
+    /** */
+    static final class TestAccount {
+        /** */
+        private final TestPerson person;
+
+        /** */
+        private final int accountId;
+
+        /** */
+        private final long accountBalance;
+
+        /** */
+        TestAccount(TestPerson person, int id, long balance) {
+            this.person = person;
+            accountId = id;
+            accountBalance = balance;
+        }
+    }
+
+    /** */
+    private static boolean isBinaryMetaFile(File file) {
+        return file.getPath().contains("binary_meta");
+    }
+
+    /** */
+    static final class SlowFileIOFactory implements FileIOFactory {
+        /** */
+        private final FileIOFactory delegateFactory;
+
+        /**
+         * @param delegateFactory Delegate factory.
+         */
+        SlowFileIOFactory(FileIOFactory delegateFactory) {
+            this.delegateFactory = delegateFactory;
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            FileIO delegate = delegateFactory.create(file, modes);
+
+            if (isBinaryMetaFile(file))
+                return new SlowFileIO(delegate, fileWriteLatchRef.get());
+
+            return delegate;
+        }
+    }
+
+    /** */
+    static class SlowFileIO extends FileIODecorator {
+        /** */
+        private final CountDownLatch fileWriteLatch;
+
+        /**
+         * @param delegate File I/O delegate
+         */
+        public SlowFileIO(FileIO delegate, CountDownLatch fileWriteLatch) {
+            super(delegate);
+
+            this.fileWriteLatch = fileWriteLatch;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int write(byte[] buf, int off, int len) throws IOException {
+            try {
+                fileWriteLatch.await();
+            }
+            catch (InterruptedException e) {
+                // No-op.
+            }
+
+            return super.write(buf, off, len);
+        }
+    }
+
+    /** */
+    static final class FailingFileIOFactory implements FileIOFactory {
+        /** */
+        private final FileIOFactory delegateFactory;
+
+        /**
+         * @param factory Delegate factory.
+         */
+        FailingFileIOFactory(FileIOFactory factory) {
+            delegateFactory = factory;
+        }
+
+        /** {@inheritDoc}*/
+        @Override public FileIO create(File file, OpenOption... modes) throws IOException {
+            FileIO delegate = delegateFactory.create(file, modes);
+
+            if (isBinaryMetaFile(file))
+                return new FailingFileIO(delegate);
+
+            return delegate;
+        }
+    }
+
+    /** */
+    static final class FailingFileIO extends FileIODecorator {
+        /**
+         * @param delegate File I/O delegate
+         */
+        public FailingFileIO(FileIO delegate) {
+            super(delegate);
+        }
+
+        /** {@inheritDoc}*/
+        @Override public int write(byte[] buf, int off, int len) throws IOException {
+            throw new IOException("Error occured during write of binary metadata");
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 7a26606..2384b4a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -17,6 +17,11 @@
 
 package org.apache.ignite.testframework;
 
+import javax.cache.CacheException;
+import javax.cache.configuration.Factory;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -66,11 +71,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import javax.cache.CacheException;
-import javax.cache.configuration.Factory;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -2380,4 +2380,11 @@
             }
         }
     }
+
+    /**
+     * @param runnableX Runnable with exception.
+     */
+    public static void suppressException(RunnableX runnableX) {
+        runnableX.run();
+    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
index 539f5aa..d58948d 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgnitePdsWithIndexingCoreTestSuite.java
@@ -18,6 +18,7 @@
 
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheHistoricalRebalancingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
+import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataAsyncWritingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinaryMetadataOnClusterRestartTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsBinarySortObjectFieldsTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsCorruptedIndexTest;
@@ -80,6 +81,7 @@
     IgnitePdsCacheDestroyDuringCheckpointTest.class,
 
     IgnitePdsBinaryMetadataOnClusterRestartTest.class,
+    IgnitePdsBinaryMetadataAsyncWritingTest.class,
     IgnitePdsMarshallerMappingRestoreOnNodeStartTest.class,
     IgnitePdsThreadInterruptionTest.class,
     IgnitePdsBinarySortObjectFieldsTest.class,