/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.binary;

import java.io.File;
import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryField;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.binary.BinaryTypeConfiguration;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteNodeAttributes;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.UnregisteredBinaryTypeException;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryEnumObjectImpl;
import org.apache.ignite.internal.binary.BinaryFieldMetadata;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.binary.BinaryObjectEx;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.cache.CacheDefaultBinaryAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectByteArrayImpl;
import org.apache.ignite.internal.processors.cacheobject.UserCacheObjectImpl;
import org.apache.ignite.internal.processors.cacheobject.UserKeyCacheObjectImpl;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.MutableSingletonList;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.IgniteDiscoveryThread;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;

/**
 * Binary processor implementation.
 */
public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter implements IgniteCacheObjectProcessor {
    /** Immutable classes. */
    private static final Collection<Class<?>> IMMUTABLE_CLS = new HashSet<>();

    /** */
    private volatile boolean discoveryStarted;

    /** */
    private volatile IgniteFuture<?> reconnectFut;

    /** */
    private BinaryContext binaryCtx;

    /** */
    private Marshaller marsh;

    /** */
    private GridBinaryMarshaller binaryMarsh;

    /** */
    private BinaryMetadataFileStore metadataFileStore;

    /**
     * Custom folder specifying local folder for {@link #metadataFileStore}.<br>
     * {@code null} means no specific folder is configured. <br>
     * In this case folder for metadata is composed from work directory and consistentId <br>
     */
    @Nullable private File binaryMetadataFileStoreDir;

    /** How long to wait for schema if no updates in progress. */
    private long waitSchemaTimeout = IgniteSystemProperties.getLong(IGNITE_WAIT_SCHEMA_UPDATE, 30_000);

    /** For tests. */
    @SuppressWarnings("PublicField")
    public static boolean useTestBinaryCtx;

    /** */
    @GridToStringExclude
    private IgniteBinary binaries;

    /** Locally cached metadata. This local cache is managed by exchanging discovery custom events. */
    private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache = new ConcurrentHashMap<>();

    /** */
    private BinaryMetadataTransport transport;

    /** Cached affinity key field names. */
    private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>();

    /*
     * Static initializer
     */
    static {
        IMMUTABLE_CLS.add(String.class);
        IMMUTABLE_CLS.add(Boolean.class);
        IMMUTABLE_CLS.add(Byte.class);
        IMMUTABLE_CLS.add(Short.class);
        IMMUTABLE_CLS.add(Character.class);
        IMMUTABLE_CLS.add(Integer.class);
        IMMUTABLE_CLS.add(Long.class);
        IMMUTABLE_CLS.add(Float.class);
        IMMUTABLE_CLS.add(Double.class);
        IMMUTABLE_CLS.add(UUID.class);
        IMMUTABLE_CLS.add(IgniteUuid.class);
        IMMUTABLE_CLS.add(BigDecimal.class);
    }

    /**
     * @param ctx Kernal context.
     */
    @SuppressWarnings("deprecation")
    public CacheObjectBinaryProcessorImpl(GridKernalContext ctx) {
        super(ctx);

        marsh = ctx.grid().configuration().getMarshaller();
    }

    /**
     * @param igniteWorkDir Basic ignite working directory.
     * @param consId Node consistent id.
     * @return Working directory.
     */
    public static File resolveBinaryWorkDir(String igniteWorkDir, String consId) {
        File workDir = binaryWorkDir(igniteWorkDir, consId);

        if (!U.mkdirs(workDir))
            throw new IgniteException("Could not create directory for binary metadata: " + workDir);

        return workDir;
    }

    /**
     * @param igniteWorkDir Basic ignite working directory.
     * @param consId Node consistent id.
     * @return Working directory.
     */
    public static File binaryWorkDir(String igniteWorkDir, String consId) {
        if (F.isEmpty(igniteWorkDir) || F.isEmpty(consId)) {
            throw new IgniteException("Work directory or consistent id has not been set " +
                "[igniteWorkDir=" + igniteWorkDir + ", consId=" + consId + ']');
        }

        return Paths.get(igniteWorkDir, DataStorageConfiguration.DFLT_BINARY_METADATA_PATH, consId).toFile();
    }

    /** {@inheritDoc} */
    @Override public void start() throws IgniteCheckedException {
        if (marsh instanceof BinaryMarshaller) {
            if (!ctx.clientNode()) {
                metadataFileStore = new BinaryMetadataFileStore(metadataLocCache,
                    ctx,
                    log,
                    binaryMetadataFileStoreDir == null ?
                        resolveBinaryWorkDir(ctx.config().getWorkDirectory(),
                            ctx.pdsFolderResolver().resolveFolders().folderName()) :
                        binaryMetadataFileStoreDir);

                metadataFileStore.start();
            }

            BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
                @Override public void addMeta(
                    int typeId,
                    BinaryType newMeta,
                    boolean failIfUnregistered) throws BinaryObjectException {
                    assert newMeta != null;
                    assert newMeta instanceof BinaryTypeImpl;

                    if (!discoveryStarted) {
                        BinaryMetadataHolder holder = metadataLocCache.get(typeId);

                        BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;

                        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());

                        if (oldMeta != mergedMeta)
                            metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));

                        return;
                    }

                    BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();

                    CacheObjectBinaryProcessorImpl.this.addMeta(
                        typeId,
                        newMeta0.wrap(binaryCtx),
                        failIfUnregistered
                    );
                }

                @Override public void addMetaLocally(int typeId, BinaryType meta, boolean failIfUnregistered)
                    throws BinaryObjectException {
                    CacheObjectBinaryProcessorImpl.this.addMetaLocally(typeId, meta);
                }

                @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
                    return CacheObjectBinaryProcessorImpl.this.metadata(typeId);
                }

                @Override public BinaryMetadata metadata0(int typeId) throws BinaryObjectException {
                    return CacheObjectBinaryProcessorImpl.this.metadata0(typeId);
                }

                @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
                    return CacheObjectBinaryProcessorImpl.this.metadata(typeId, schemaId);
                }

                @Override public Collection<BinaryType> metadata() throws BinaryObjectException {
                    return CacheObjectBinaryProcessorImpl.this.metadata();
                }
            };

            BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;

            binaryCtx = useTestBinaryCtx ?
                new TestBinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class)) :
                new BinaryContext(metaHnd, ctx.config(), ctx.log(BinaryContext.class));

            transport = new BinaryMetadataTransport(metadataLocCache, metadataFileStore, binaryCtx, ctx, log);

            IgniteUtils.invoke(BinaryMarshaller.class, bMarsh0, "setBinaryContext", binaryCtx, ctx.config());

            binaryMarsh = new GridBinaryMarshaller(binaryCtx);

            binaries = new IgniteBinaryImpl(ctx, this);

            if (!getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK)) {
                BinaryConfiguration bCfg = ctx.config().getBinaryConfiguration();

                if (bCfg != null) {
                    Map<String, Object> map = new HashMap<>();

                    map.put("globIdMapper", bCfg.getIdMapper() != null ? bCfg.getIdMapper().getClass().getName() : null);
                    map.put("globSerializer", bCfg.getSerializer() != null ? bCfg.getSerializer().getClass() : null);
                    map.put("compactFooter", bCfg.isCompactFooter());

                    if (bCfg.getTypeConfigurations() != null) {
                        Map<Object, Object> typeCfgsMap = new HashMap<>();

                        for (BinaryTypeConfiguration c : bCfg.getTypeConfigurations()) {
                            typeCfgsMap.put(
                                c.getTypeName() != null,
                                Arrays.asList(
                                    c.getIdMapper() != null ? c.getIdMapper().getClass() : null,
                                    c.getSerializer() != null ? c.getSerializer().getClass() : null,
                                    c.isEnum()
                                )
                            );

                            if (c.isEnum())
                                BinaryUtils.validateEnumValues(c.getTypeName(), c.getEnumValues());
                        }

                        map.put("typeCfgs", typeCfgsMap);
                    }

                    ctx.addNodeAttribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION, map);
                }
            }

            if (!ctx.clientNode())
                metadataFileStore.restoreMetadata();
        }
    }

    /**
     * @param lsnr Listener.
     */
    public void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
        if (transport != null)
            transport.addBinaryMetadataUpdateListener(lsnr);
    }

    /** {@inheritDoc} */
    @Override public void stop(boolean cancel) {
        if (transport != null)
            transport.stop();

        if (metadataFileStore != null)
            metadataFileStore.stop();
    }

    /** {@inheritDoc} */
    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) {
        this.reconnectFut = reconnectFut;

        if (transport != null)
            transport.onDisconnected();

        binaryContext().unregisterUserTypeDescriptors();
        binaryContext().unregisterBinarySchemas();

        metadataLocCache.clear();
    }

    /** {@inheritDoc} */
    @Override public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted) throws IgniteCheckedException {
        reconnectFut = null;

        return super.onReconnected(clusterRestarted);
    }

    /** {@inheritDoc} */
    @Override public void onKernalStart(boolean active) throws IgniteCheckedException {
        super.onKernalStart(active);

        discoveryStarted = true;
    }

    /** {@inheritDoc} */
    @Nullable @Override public CacheObject prepareForCache(@Nullable CacheObject obj, GridCacheContext cctx) {
        if (obj == null)
            return null;

        return obj.prepareForCache(cctx.cacheObjectContext());
    }

    /** {@inheritDoc} */
    @Override public int typeId(String typeName) {
        if (binaryCtx == null)
            return 0;

        return binaryCtx.typeId(typeName);
    }

    /** {@inheritDoc} */
    @Override public boolean immutable(Object obj) {
        assert obj != null;

        return IMMUTABLE_CLS.contains(obj.getClass());
    }

    /** {@inheritDoc} */
    @Override public void onContinuousProcessorStarted(GridKernalContext ctx) {
        // No-op.
    }

    /**
     * @param obj Object.
     * @return Bytes.
     * @throws BinaryObjectException If failed.
     */
    public byte[] marshal(@Nullable Object obj) throws BinaryObjectException {
        byte[] arr = binaryMarsh.marshal(obj, false);

        assert arr.length > 0;

        return arr;
    }

    /**
     * @param ptr Off-heap pointer.
     * @param forceHeap If {@code true} creates heap-based object.
     * @return Object.
     * @throws BinaryObjectException If failed.
     */
    public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
        assert ptr > 0 : ptr;

        int size = GridUnsafe.getInt(ptr);

        ptr += 4;

        byte type = GridUnsafe.getByte(ptr++);

        if (type != CacheObject.TYPE_BYTE_ARR) {
            assert size > 0 : size;

            BinaryInputStream in = new BinaryOffheapInputStream(ptr, size, forceHeap);

            return binaryMarsh.unmarshal(in);
        }
        else
            return U.copyMemory(ptr, size);
    }

    /** {@inheritDoc} */
    @Override public Object marshalToBinary(
        @Nullable Object obj,
        boolean failIfUnregistered
    ) throws BinaryObjectException {
        if (obj == null)
            return null;

        if (BinaryUtils.isBinaryType(obj.getClass()))
            return obj;

        if (obj instanceof Object[]) {
            Object[] arr = (Object[])obj;

            Object[] pArr = new Object[arr.length];

            for (int i = 0; i < arr.length; i++)
                pArr[i] = marshalToBinary(arr[i], failIfUnregistered);

            return pArr;
        }

        if (obj instanceof IgniteBiTuple) {
            IgniteBiTuple tup = (IgniteBiTuple)obj;

            if (obj instanceof T2)
                return new T2<>(marshalToBinary(tup.get1(), failIfUnregistered),
                    marshalToBinary(tup.get2(), failIfUnregistered));

            return new IgniteBiTuple<>(marshalToBinary(tup.get1(), failIfUnregistered),
                marshalToBinary(tup.get2(), failIfUnregistered));
        }

        {
            Collection<Object> pCol = BinaryUtils.newKnownCollection(obj);

            if (pCol != null) {
                Collection<?> col = (Collection<?>)obj;

                for (Object item : col)
                    pCol.add(marshalToBinary(item, failIfUnregistered));

                return (pCol instanceof MutableSingletonList) ? U.convertToSingletonList(pCol) : pCol;
            }
        }

        {
            Map<Object, Object> pMap = BinaryUtils.newKnownMap(obj);

            if (pMap != null) {
                Map<?, ?> map = (Map<?, ?>)obj;

                for (Map.Entry<?, ?> e : map.entrySet())
                    pMap.put(marshalToBinary(e.getKey(), failIfUnregistered),
                        marshalToBinary(e.getValue(), failIfUnregistered));

                return pMap;
            }
        }

        if (obj instanceof Map.Entry) {
            Map.Entry<?, ?> e = (Map.Entry<?, ?>)obj;

            return new GridMapEntry<>(marshalToBinary(e.getKey(), failIfUnregistered),
                marshalToBinary(e.getValue(), failIfUnregistered));
        }

        if (binaryMarsh.mustDeserialize(obj))
            return obj; // No need to go through marshal-unmarshal because result will be the same as initial object.

        byte[] arr = binaryMarsh.marshal(obj, failIfUnregistered);

        assert arr.length > 0;

        Object obj0 = binaryMarsh.unmarshal(arr, null);

        // Possible if a class has writeObject method.
        if (obj0 instanceof BinaryObjectImpl)
            ((BinaryObjectImpl)obj0).detachAllowed(true);

        return obj0;
    }

    /**
     * @return Marshaller.
     */
    public GridBinaryMarshaller marshaller() {
        return binaryMarsh;
    }

    /** {@inheritDoc} */
    @Override public BinaryObjectBuilder builder(String clsName) {
        return new BinaryObjectBuilderImpl(binaryCtx, clsName);
    }

    /** {@inheritDoc} */
    @Override public BinaryObjectBuilder builder(BinaryObject binaryObj) {
        return BinaryObjectBuilderImpl.wrap(binaryObj);
    }

    /** {@inheritDoc} */
    @Override public void updateMetadata(int typeId, String typeName, @Nullable String affKeyFieldName,
        Map<String, BinaryFieldMetadata> fieldTypeIds, boolean isEnum, @Nullable Map<String, Integer> enumMap)
        throws BinaryObjectException {
        BinaryMetadata meta = new BinaryMetadata(typeId, typeName, fieldTypeIds, affKeyFieldName, null, isEnum,
            enumMap);

        binaryCtx.updateMetadata(typeId, meta, false);
    }

    /** {@inheritDoc} */
    @Override public void addMeta(final int typeId, final BinaryType newMeta, boolean failIfUnregistered)
        throws BinaryObjectException {
        assert newMeta != null;
        assert newMeta instanceof BinaryTypeImpl;

        BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();

        if (failIfUnregistered) {
            failIfUnregistered(typeId, newMeta0);

            return;
        }

        try {
            GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(newMeta0);

            if (fut == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Metadata update was skipped [typeId=" + typeId
                        + ", typeName=" + newMeta.typeName() + ']');
                }

                return;
            }

            long t0 = System.nanoTime();

            MetadataUpdateResult res = fut.get();

            if (log.isDebugEnabled()) {
                IgniteInternalTx tx = ctx.cache().context().tm().tx();

                log.debug("Completed metadata update [typeId=" + typeId +
                    ", typeName=" + newMeta.typeName() +
                    ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" +
                    ", fut=" + fut +
                    ", tx=" + CU.txString(tx) +
                    ']');
            }

            assert res != null;

            if (res.rejected())
                throw res.error();
            else if (!ctx.clientNode())
                metadataFileStore.waitForWriteCompletion(typeId, res.typeVersion());
        }
        catch (IgniteCheckedException e) {
            IgniteCheckedException ex = e;

            if (ctx.isStopping()) {
                ex = new NodeStoppingException("Node is stopping.");

                ex.addSuppressed(e);
            }

            throw new BinaryObjectException("Failed to update metadata for type: " + newMeta.typeName(), ex);
        }
    }

    /**
     * Throw specific exception if given binary metadata is unregistered.
     *
     * @param typeId Type id.
     * @param newMeta0 Expected binary metadata.
     */
    private void failIfUnregistered(int typeId, BinaryMetadata newMeta0) {
        BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);

        BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;

        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);

        if (mergedMeta != oldMeta)
            throw new UnregisteredBinaryTypeException(typeId, mergedMeta);

        if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
            return;

        // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary.
        GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());

        if (!fut.isDone())
            throw new UnregisteredBinaryTypeException(typeId, fut);
    }

    /** {@inheritDoc} */
    @Override public void addMetaLocally(int typeId, BinaryType newMeta) throws BinaryObjectException {
        assert newMeta != null;
        assert newMeta instanceof BinaryTypeImpl;

        BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();

        BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);

        BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;

        try {
            BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);

            if (!ctx.clientNode())
                metadataFileStore.mergeAndWriteMetadata(mergedMeta);

            metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
        }
        catch (BinaryObjectException e) {
            throw new BinaryObjectException("New binary metadata is incompatible with binary metadata" +
                " persisted locally." +
                " Consider cleaning up persisted metadata from <workDir>/db/binary_meta directory.", e);
        }
    }

    /** {@inheritDoc} */
    @Nullable @Override public BinaryType metadata(final int typeId) {
        BinaryMetadata meta = metadata0(typeId);

        return meta != null ? meta.wrap(binaryCtx) : null;
    }

    /**
     * Forces caller thread to wait for binary metadata write operation for given type ID.
     *
     * In case of in-memory mode this method becomes a No-op as no binary metadata is written to disk in this mode.
     *
     * @param typeId ID of binary type to wait for metadata write operation.
     */
    public void waitMetadataWriteIfNeeded(final int typeId) {
        if (metadataFileStore == null)
            return;

        BinaryMetadataHolder hldr = metadataLocCache.get(typeId);

        if (hldr != null) {
            try {
                metadataFileStore.waitForWriteCompletion(typeId, hldr.pendingVersion());
            }
            catch (IgniteCheckedException e) {
                log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
                    ", typeVer=" + hldr.acceptedVersion() + ']', e);
            }
        }
    }

    /**
     * @param typeId Type ID.
     * @return Metadata.
     * @throws IgniteException In case of error.
     */
    @Nullable public BinaryMetadata metadata0(final int typeId) {
        BinaryMetadataHolder holder = metadataLocCache.get(typeId);

        IgniteThread curThread = IgniteThread.current();

        if (holder == null && (curThread == null || !curThread.isForbiddenToRequestBinaryMetadata())) {
            if (ctx.clientNode()) {
                try {
                    transport.requestUpToDateMetadata(typeId).get();

                    holder = metadataLocCache.get(typeId);
                }
                catch (IgniteCheckedException ignored) {
                    // No-op.
                }
            }
        }

        if (holder != null) {
            if (holder.removing()) {
                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataRemove(typeId);

                try {
                    fut.get();
                }
                catch (IgniteCheckedException ignored) {
                    // No-op.
                }

                return null;
            }

            if (curThread instanceof IgniteDiscoveryThread || (curThread != null && curThread.isForbiddenToRequestBinaryMetadata()))
                return holder.metadata();

            if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, holder.pendingVersion());

                if (log.isDebugEnabled() && !fut.isDone())
                    log.debug("Waiting for update for" +
                        " [typeId=" + typeId +
                        ", pendingVer=" + holder.pendingVersion() +
                        ", acceptedVer=" + holder.acceptedVersion() + "]");

                try {
                    fut.get();
                }
                catch (IgniteCheckedException ignored) {
                    // No-op.
                }
            }
            else if (metadataFileStore != null) {
                try {
                    metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
                }
                catch (IgniteCheckedException e) {
                    log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
                        ", typeVer=" + holder.acceptedVersion() + ']', e);

                    return null;
                }
            }

            return holder.metadata();
        }
        else
            return null;
    }

    /** {@inheritDoc} */
    @Nullable @Override public BinaryType metadata(final int typeId, final int schemaId) {
        BinaryMetadataHolder holder = metadataLocCache.get(typeId);

        if (ctx.clientNode()) {
            if (holder == null || !holder.metadata().hasSchema(schemaId)) {
                if (log.isDebugEnabled())
                    log.debug("Waiting for client metadata update" +
                        " [typeId=" + typeId
                        + ", schemaId=" + schemaId
                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
                        + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion()) + ']');

                try {
                    transport.requestUpToDateMetadata(typeId).get();
                }
                catch (IgniteCheckedException ignored) {
                    // No-op.
                }

                holder = metadataLocCache.get(typeId);

                IgniteFuture<?> reconnectFut0 = reconnectFut;

                if (holder == null && reconnectFut0 != null)
                    throw new IgniteClientDisconnectedException(reconnectFut0, "Client node disconnected.");

                if (log.isDebugEnabled())
                    log.debug("Finished waiting for client metadata update" +
                        " [typeId=" + typeId
                        + ", schemaId=" + schemaId
                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
                        + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion()) + ']');
            }
        }
        else {
            if (holder != null && IgniteThread.current() instanceof IgniteDiscoveryThread)
                return holder.metadata().wrap(binaryCtx);
            else if (holder != null && (holder.pendingVersion() - holder.acceptedVersion() > 0)) {
                if (log.isDebugEnabled())
                    log.debug("Waiting for metadata update" +
                        " [typeId=" + typeId
                        + ", schemaId=" + schemaId
                        + ", pendingVer=" + holder.pendingVersion()
                        + ", acceptedVer=" + holder.acceptedVersion() + ']');

                long t0 = System.nanoTime();

                GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
                    typeId,
                    holder.pendingVersion());

                try {
                    fut.get();
                }
                catch (IgniteCheckedException e) {
                    log.error("Failed to wait for metadata update [typeId=" + typeId + ", schemaId=" + schemaId + ']', e);
                }

                if (log.isDebugEnabled())
                    log.debug("Finished waiting for metadata update" +
                        " [typeId=" + typeId
                        + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
                        + ", schemaId=" + schemaId
                        + ", pendingVer=" + holder.pendingVersion()
                        + ", acceptedVer=" + holder.acceptedVersion() + ']');

                holder = metadataLocCache.get(typeId);
            }
            else if (holder == null || !holder.metadata().hasSchema(schemaId)) {
                // Last resort waiting.
                U.warn(log,
                    "Schema is missing while no metadata updates are in progress " +
                        "(will wait for schema update within timeout defined by " + IGNITE_WAIT_SCHEMA_UPDATE + " system property)" +
                        " [typeId=" + typeId
                        + ", missingSchemaId=" + schemaId
                        + ", pendingVer=" + (holder == null ? "NA" : holder.pendingVersion())
                        + ", acceptedVer=" + (holder == null ? "NA" : holder.acceptedVersion())
                        + ", binMetaUpdateTimeout=" + waitSchemaTimeout + ']');

                long t0 = System.nanoTime();

                GridFutureAdapter<?> fut = transport.awaitSchemaUpdate(typeId, schemaId);

                try {
                    fut.get(waitSchemaTimeout);
                }
                catch (IgniteFutureTimeoutCheckedException e) {
                    log.error("Timed out while waiting for schema update [typeId=" + typeId + ", schemaId=" +
                        schemaId + ']');
                }
                catch (IgniteCheckedException ignored) {
                    // No-op.
                }

                holder = metadataLocCache.get(typeId);

                if (log.isDebugEnabled() && holder != null && holder.metadata().hasSchema(schemaId))
                    log.debug("Found the schema after wait" +
                        " [typeId=" + typeId
                        + ", waitTime=" + NANOSECONDS.convert(System.nanoTime() - t0, MILLISECONDS) + "ms"
                        + ", schemaId=" + schemaId
                        + ", pendingVer=" + holder.pendingVersion()
                        + ", acceptedVer=" + holder.acceptedVersion() + ']');
            }
        }

        if (holder != null && metadataFileStore != null) {
            try {
                metadataFileStore.waitForWriteCompletion(typeId, holder.pendingVersion());
            }
            catch (IgniteCheckedException e) {
                log.warning("Failed to wait for metadata write operation for [typeId=" + typeId +
                    ", typeVer=" + holder.acceptedVersion() + ']', e);

                return null;
            }
        }

        return holder != null ? holder.metadata().wrap(binaryCtx) : null;
    }

    /** {@inheritDoc} */
    @Override public Map<Integer, BinaryType> metadata(Collection<Integer> typeIds)
        throws BinaryObjectException {
        try {
            Map<Integer, BinaryType> res = U.newHashMap(metadataLocCache.size());

            for (Map.Entry<Integer, BinaryMetadataHolder> e : metadataLocCache.entrySet())
                res.put(e.getKey(), e.getValue().metadata().wrap(binaryCtx));

            return res;
        }
        catch (CacheException e) {
            throw new BinaryObjectException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public Collection<BinaryType> metadata() throws BinaryObjectException {
        return F.viewReadOnly(metadataLocCache.values(), new IgniteClosure<BinaryMetadataHolder, BinaryType>() {
            @Override public BinaryType apply(BinaryMetadataHolder metaHolder) {
                return metaHolder.metadata().wrap(binaryCtx);
            }
        });
    }

    /**
     * @return Cluster binary metadata.
     * @throws BinaryObjectException on error.
     */
    public Collection<BinaryMetadata> binaryMetadata() throws BinaryObjectException {
        return F.viewReadOnly(metadataLocCache.values(), new IgniteClosure<BinaryMetadataHolder, BinaryMetadata>() {
            @Override public BinaryMetadata apply(BinaryMetadataHolder metaHolder) {
                return metaHolder.metadata();
            }
        });
    }

    /**
     * @return Binary metadata for specified type.
     * @throws BinaryObjectException on error.
     */
    public BinaryMetadata binaryMetadata(int typeId) throws BinaryObjectException {
        BinaryMetadataHolder hld = metadataLocCache.get(typeId);

        return hld != null ? hld.metadata() : null;
    }

    /** {@inheritDoc} */
    @Override public void saveMetadata(Collection<BinaryType> types, File dir) {
        try {
            BinaryMetadataFileStore writer = new BinaryMetadataFileStore(new ConcurrentHashMap<>(),
                ctx,
                log,
                resolveBinaryWorkDir(dir.getAbsolutePath(),
                    ctx.pdsFolderResolver().resolveFolders().folderName()));

            for (BinaryType type : types)
                writer.mergeAndWriteMetadata(((BinaryTypeImpl)type).metadata());
        }
        catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }

    /** {@inheritDoc} */
    @Override public BinaryObject buildEnum(String typeName, int ord) throws BinaryObjectException {
        A.notNullOrEmpty(typeName, "enum type name");

        int typeId = binaryCtx.typeId(typeName);

        typeName = binaryCtx.userTypeName(typeName);

        updateMetadata(typeId, typeName, null, null, true, null);

        return new BinaryEnumObjectImpl(binaryCtx, typeId, null, ord);
    }

    /** {@inheritDoc} */
    @Override public BinaryObject buildEnum(String typeName, String name) throws BinaryObjectException {
        A.notNullOrEmpty(typeName, "enum type name");
        A.notNullOrEmpty(name, "enum name");

        int typeId = binaryCtx.typeId(typeName);

        BinaryMetadata metadata = metadata0(typeId);

        if (metadata == null)
            throw new BinaryObjectException("Failed to get metadata for type [typeId=" +
                    typeId + ", typeName='" + typeName + "']");

        Integer ordinal = metadata.getEnumOrdinalByName(name);

        typeName = binaryCtx.userTypeName(typeName);

        if (ordinal == null)
            throw new BinaryObjectException("Failed to resolve enum ordinal by name [typeId=" +
                    typeId + ", typeName='" + typeName + "', name='" + name + "']");

        return new BinaryEnumObjectImpl(binaryCtx, typeId, null, ordinal);
    }

    /** {@inheritDoc} */
    @Override public BinaryType registerEnum(String typeName, Map<String, Integer> vals) throws BinaryObjectException {
        A.notNullOrEmpty(typeName, "enum type name");

        int typeId = binaryCtx.typeId(typeName);

        typeName = binaryCtx.userTypeName(typeName);

        BinaryUtils.validateEnumValues(typeName, vals);

        updateMetadata(typeId, typeName, null, null, true, vals);

        return binaryCtx.metadata(typeId);
    }

    /** {@inheritDoc} */
    @Override public IgniteBinary binary() throws IgniteException {
        return binaries;
    }

    /** {@inheritDoc} */
    @Override public boolean isBinaryObject(Object obj) {
        return obj instanceof BinaryObject;
    }

    /** {@inheritDoc} */
    @Override public boolean isBinaryEnabled(CacheConfiguration<?, ?> ccfg) {
        return marsh instanceof BinaryMarshaller;
    }

    /**
     * Get affinity key field.
     *
     * @param typeId Binary object type ID.
     * @return Affinity key.
     */
    public BinaryField affinityKeyField(int typeId) {
        // Fast path for already cached field.
        T1<BinaryField> fieldHolder = affKeyFields.get(typeId);

        if (fieldHolder != null)
            return fieldHolder.get();

        // Slow path if affinity field is not cached yet.
        String name = binaryCtx.affinityKeyFieldName(typeId);

        if (name != null) {
            BinaryField field = binaryCtx.createField(typeId, name);

            affKeyFields.putIfAbsent(typeId, new T1<>(field));

            return field;
        }
        else {
            affKeyFields.putIfAbsent(typeId, new T1<>(null));

            return null;
        }
    }

    /** {@inheritDoc} */
    @Override public int typeId(Object obj) {
        if (obj == null)
            return 0;

        return isBinaryObject(obj) ? ((BinaryObjectEx)obj).typeId() : typeId(obj.getClass().getSimpleName());
    }

    /** {@inheritDoc} */
    @Override public Object field(Object obj, String fieldName) {
        if (obj == null)
            return null;

        return isBinaryObject(obj) ? ((BinaryObject)obj).field(fieldName) : null;
    }

    /** {@inheritDoc} */
    @Override public boolean hasField(Object obj, String fieldName) {
        return obj != null && ((BinaryObject)obj).hasField(fieldName);
    }

    /**
     * @return Binary context.
     */
    public BinaryContext binaryContext() {
        return binaryCtx;
    }

    /** {@inheritDoc} */
    @SuppressWarnings("deprecation")
    @Override public CacheObjectContext contextForCache(CacheConfiguration ccfg) throws IgniteCheckedException {
        assert ccfg != null;

        boolean storeVal = !ccfg.isCopyOnRead() || (!isBinaryEnabled(ccfg) &&
            (QueryUtils.isEnabled(ccfg) || ctx.config().isPeerClassLoadingEnabled()));

        boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(ccfg.getName());

        AffinityKeyMapper cacheAffMapper = ccfg.getAffinityMapper();

        AffinityKeyMapper dfltAffMapper = binaryEnabled ?
            new CacheDefaultBinaryAffinityKeyMapper(ccfg.getKeyConfiguration()) :
            new GridCacheDefaultAffinityKeyMapper();

        ctx.resource().injectGeneric(dfltAffMapper);

        return new CacheObjectContext(ctx,
            ccfg.getName(),
            dfltAffMapper,
            QueryUtils.isCustomAffinityMapper(ccfg.getAffinityMapper()),
            ccfg.isCopyOnRead(),
            storeVal,
            ctx.config().isPeerClassLoadingEnabled() && !isBinaryEnabled(ccfg),
            binaryEnabled
        );
    }

    /** {@inheritDoc} */
    @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
        if (!ctx.binaryEnabled() || binaryMarsh == null)
            return CU.marshal(ctx.kernalContext().cache().context(), ctx.addDeploymentInfo(), val);

        byte[] arr = binaryMarsh.marshal(val, false);

        assert arr.length > 0;

        return arr;
    }

    /** {@inheritDoc} */
    @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
        throws IgniteCheckedException {
        if (!ctx.binaryEnabled() || binaryMarsh == null)
            return U.unmarshal(ctx.kernalContext(), bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config()));

        return binaryMarsh.unmarshal(bytes, clsLdr);
    }

    /** {@inheritDoc} */
    @Override public KeyCacheObject toCacheKeyObject(CacheObjectContext ctx, @Nullable GridCacheContext cctx,
        Object obj, boolean userObj) {
        if (!ctx.binaryEnabled()) {
            if (obj instanceof KeyCacheObject) {
                KeyCacheObject key = (KeyCacheObject)obj;

                if (key.partition() == -1)
                    // Assume all KeyCacheObjects except BinaryObject can not be reused for another cache.
                    key.partition(partition(ctx, cctx, key));

                return (KeyCacheObject)obj;
            }

            return toCacheKeyObject0(ctx, cctx, obj, userObj);
        }

        if (obj instanceof KeyCacheObject) {
            KeyCacheObject key = (KeyCacheObject)obj;

            if (key instanceof BinaryObjectImpl) {
                // Need to create a copy because the key can be reused at the application layer after that (IGNITE-3505).
                key = key.copy(partition(ctx, cctx, key));
            }
            else if (key.partition() == -1)
                // Assume others KeyCacheObjects can not be reused for another cache.
                key.partition(partition(ctx, cctx, key));

            return key;
        }

        obj = toBinary(obj, false);

        if (obj instanceof BinaryObjectImpl) {
            ((KeyCacheObject) obj).partition(partition(ctx, cctx, obj));

            return (KeyCacheObject)obj;
        }

        return toCacheKeyObject0(ctx, cctx, obj, userObj);
    }

    /**
     * @param obj Object.
     * @param userObj If {@code true} then given object is object provided by user and should be copied
     *        before stored in cache.
     * @return Key cache object.
     */
    protected KeyCacheObject toCacheKeyObject0(CacheObjectContext ctx,
        @Nullable GridCacheContext cctx,
        Object obj,
        boolean userObj) {
        int part = partition(ctx, cctx, obj);

        if (!userObj)
            return new KeyCacheObjectImpl(obj, null, part);

        return new UserKeyCacheObjectImpl(obj, part);
    }

    /** {@inheritDoc} */
    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj,
        boolean userObj, boolean failIfUnregistered) {
        if (!ctx.binaryEnabled()) {
            if (obj == null || obj instanceof CacheObject)
                return (CacheObject)obj;

            return toCacheObject0(obj, userObj);
        }

        if (obj == null || obj instanceof CacheObject)
            return (CacheObject)obj;

        obj = toBinary(obj, failIfUnregistered);

        if (obj instanceof CacheObject)
            return (CacheObject)obj;

        return toCacheObject0(obj, userObj);
    }

    /**
     * @param obj Object.
     * @param userObj If {@code true} then given object is object provided by user and should be copied
     *        before stored in cache.
     * @return Cache object.
     */
    private CacheObject toCacheObject0(@Nullable Object obj, boolean userObj) {
        assert obj != null;

        if (obj instanceof byte[]) {
            if (!userObj)
                return new CacheObjectByteArrayImpl((byte[])obj);

            return new UserCacheObjectByteArrayImpl((byte[])obj);
        }

        if (!userObj)
            return new CacheObjectImpl(obj, null);

        return new UserCacheObjectImpl(obj, null);
    }

    /**
     * @param ctx Cache objects context.
     * @param cctx Cache context.
     * @param obj Object.
     * @return Object partition.
     */
    private int partition(CacheObjectContext ctx, @Nullable GridCacheContext cctx, Object obj) {
        try {
            return cctx != null ?
                cctx.affinity().partition(obj, false) :
                ctx.kernalContext().affinity().partition0(ctx.cacheName(), obj, null);
        }
        catch (IgniteCheckedException e) {
            U.error(log, "Failed to get partition", e);

            return -1;
        }
    }

    /** {@inheritDoc} */
    @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] bytes) {
        switch (type) {
            case BinaryObjectImpl.TYPE_BINARY:
                return new BinaryObjectImpl(binaryContext(), bytes, 0);

            case BinaryObjectImpl.TYPE_BINARY_ENUM:
                return new BinaryEnumObjectImpl(binaryContext(), bytes);

            case CacheObject.TYPE_BYTE_ARR:
                return new CacheObjectByteArrayImpl(bytes);

            case CacheObject.TYPE_REGULAR:
                return new CacheObjectImpl(null, bytes);
        }

        throw new IllegalArgumentException("Invalid object type: " + type);
    }

    /** {@inheritDoc} */
    @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, byte[] bytes)
        throws IgniteCheckedException {
        switch (type) {
            case BinaryObjectImpl.TYPE_BINARY:
                return new BinaryObjectImpl(binaryContext(), bytes, 0);

            case CacheObject.TYPE_BYTE_ARR:
                throw new IllegalArgumentException("Byte arrays cannot be used as cache keys.");

            case CacheObject.TYPE_REGULAR:
                return new KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, null), bytes, -1);
        }

        throw new IllegalArgumentException("Invalid object type: " + type);
    }

    /** {@inheritDoc} */
    @Override public CacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf) {
        int len = buf.getInt();

        assert len >= 0 : len;

        byte type = buf.get();

        byte[] data = new byte[len];

        buf.get(data);

        return toCacheObject(ctx, type, data);
    }

    /** {@inheritDoc} */
    @Override public IncompleteCacheObject toCacheObject(CacheObjectContext ctx, ByteBuffer buf,
        @Nullable IncompleteCacheObject incompleteObj) {
        if (incompleteObj == null)
            incompleteObj = new IncompleteCacheObject(buf);

        if (incompleteObj.isReady())
            return incompleteObj;

        incompleteObj.readData(buf);

        if (incompleteObj.isReady())
            incompleteObj.object(toCacheObject(ctx, incompleteObj.type(), incompleteObj.data()));

        return incompleteObj;
    }

    /** {@inheritDoc} */
    @Override public IncompleteCacheObject toKeyCacheObject(CacheObjectContext ctx, ByteBuffer buf,
        @Nullable IncompleteCacheObject incompleteObj) throws IgniteCheckedException {
        if (incompleteObj == null)
            incompleteObj = new IncompleteCacheObject(buf);

        if (incompleteObj.isReady())
            return incompleteObj;

        incompleteObj.readData(buf);

        if (incompleteObj.isReady())
            incompleteObj.object(toKeyCacheObject(ctx, incompleteObj.type(), incompleteObj.data()));

        return incompleteObj;
    }

    /** {@inheritDoc} */
    @Nullable @Override public CacheObject toCacheObject(CacheObjectContext ctx, @Nullable Object obj,
        boolean userObj) {
        return toCacheObject(ctx, obj, userObj, false);
    }

    /** {@inheritDoc} */
    @Override public Object unwrapTemporary(GridCacheContext ctx, Object obj) throws BinaryObjectException {
        if (!ctx.cacheObjectContext().binaryEnabled())
            return obj;

        if (obj instanceof BinaryObjectOffheapImpl)
            return ((BinaryObjectOffheapImpl)obj).heapCopy();

        return obj;
    }

    /**
     * @param obj Object.
     * @return Binary object.
     * @throws IgniteException In case of error.
     */
    @Nullable public Object toBinary(@Nullable Object obj, boolean failIfUnregistered) throws IgniteException {
        if (obj == null)
            return null;

        if (isBinaryObject(obj))
            return obj;

        return marshalToBinary(obj, failIfUnregistered);
    }

    /** {@inheritDoc} */
    @Nullable @Override public IgniteNodeValidationResult validateNode(ClusterNode rmtNode,
        DiscoveryDataBag.JoiningNodeDiscoveryData discoData
    ) {
        IgniteNodeValidationResult res;

        if (getBoolean(IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK) || !(marsh instanceof BinaryMarshaller))
            return null;

        if ((res = validateBinaryConfiguration(rmtNode)) != null)
            return res;

        return validateBinaryMetadata(rmtNode.id(), (Map<Integer, BinaryMetadataHolder>)discoData.joiningNodeData());
    }

    /** */
    private IgniteNodeValidationResult validateBinaryConfiguration(ClusterNode rmtNode) {
        Object rmtBinaryCfg = rmtNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);

        ClusterNode locNode = ctx.discovery().localNode();

        Object locBinaryCfg = locNode.attribute(IgniteNodeAttributes.ATTR_BINARY_CONFIGURATION);

        if (!F.eq(locBinaryCfg, rmtBinaryCfg)) {
            String msg = "Local node's binary configuration is not equal to remote node's binary configuration " +
                "[locNodeId=%s, rmtNodeId=%s, locBinaryCfg=%s, rmtBinaryCfg=%s]";

            return new IgniteNodeValidationResult(rmtNode.id(),
                String.format(msg, locNode.id(), rmtNode.id(), locBinaryCfg, rmtBinaryCfg),
                String.format(msg, rmtNode.id(), locNode.id(), rmtBinaryCfg, locBinaryCfg));
        }

        return null;
    }

    /** */
    private IgniteNodeValidationResult validateBinaryMetadata(UUID rmtNodeId, Map<Integer, BinaryMetadataHolder> newNodeMeta) {
        if (newNodeMeta == null)
            return null;

        for (Map.Entry<Integer, BinaryMetadataHolder> metaEntry : newNodeMeta.entrySet()) {
            if (!metadataLocCache.containsKey(metaEntry.getKey()))
                continue;

            BinaryMetadata locMeta = metadataLocCache.get(metaEntry.getKey()).metadata();
            BinaryMetadata rmtMeta = metaEntry.getValue().metadata();

            if (locMeta == null || rmtMeta == null)
                continue;

            try {
                mergeMetadata(locMeta, rmtMeta);
            }
            catch (Exception e) {
                String locMsg = "Exception was thrown when merging binary metadata from node %s: %s";

                String rmtMsg = "Exception was thrown on coordinator when merging binary metadata from this node: %s";

                return new IgniteNodeValidationResult(rmtNodeId,
                    String.format(locMsg, rmtNodeId.toString(), e.getMessage()),
                    String.format(rmtMsg, e.getMessage()));
            }
        }

        return null;
    }

    /** {@inheritDoc} */
    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
        return BINARY_PROC;
    }

    /** {@inheritDoc} */
    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
        if (!dataBag.commonDataCollectedFor(BINARY_PROC.ordinal())) {
            Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());

            for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet()) {
                if (!e.getValue().removing())
                    res.put(e.getKey(), e.getValue());
            }

            dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable) res);
        }
    }

    /** {@inheritDoc} */
    @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
        Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());

        for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet())
            res.put(e.getKey(), e.getValue());

        dataBag.addJoiningNodeData(BINARY_PROC.ordinal(), (Serializable) res);
    }

    /** {@inheritDoc} */
    @Override public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData data) {
        Map<Integer,BinaryMetadataHolder> newNodeMeta = (Map<Integer, BinaryMetadataHolder>) data.joiningNodeData();

        if (newNodeMeta == null)
            return;

        UUID joiningNode = data.joiningNodeId();

        for (Map.Entry<Integer, BinaryMetadataHolder> metaEntry : newNodeMeta.entrySet()) {
            if (metadataLocCache.containsKey(metaEntry.getKey())) {
                BinaryMetadataHolder localMetaHolder = metadataLocCache.get(metaEntry.getKey());

                BinaryMetadata newMeta = metaEntry.getValue().metadata();
                BinaryMetadata localMeta = localMetaHolder.metadata();

                BinaryMetadata mergedMeta = mergeMetadata(localMeta, newMeta);

                if (mergedMeta != localMeta) {
                    //put mergedMeta to local cache and store to disk
                    U.log(log,
                        String.format("Newer version of existing BinaryMetadata[typeId=%d, typeName=%s] " +
                                "is received from node %s; updating it locally",
                            mergedMeta.typeId(),
                            mergedMeta.typeName(),
                            joiningNode));

                    metadataLocCache.put(metaEntry.getKey(),
                        new BinaryMetadataHolder(mergedMeta,
                            localMetaHolder.pendingVersion(),
                            localMetaHolder.acceptedVersion()));

                    if (!ctx.clientNode())
                        metadataFileStore.writeMetadata(mergedMeta);
                }
            }
            else {
                BinaryMetadataHolder newMetaHolder = metaEntry.getValue();
                BinaryMetadata newMeta = newMetaHolder.metadata();

                U.log(log,
                    String.format("New BinaryMetadata[typeId=%d, typeName=%s] " +
                            "is received from node %s; adding it locally",
                        newMeta.typeId(),
                        newMeta.typeName(),
                        joiningNode));

                metadataLocCache.put(metaEntry.getKey(), newMetaHolder);

                if (!ctx.clientNode())
                    metadataFileStore.writeMetadata(newMeta);
            }
        }
    }

    /** {@inheritDoc} */
    @Override public void onGridDataReceived(GridDiscoveryData data) {
        Map<Integer, BinaryMetadataHolder> receivedData = (Map<Integer, BinaryMetadataHolder>) data.commonData();

        if (receivedData != null) {
            for (Map.Entry<Integer, BinaryMetadataHolder> e : receivedData.entrySet()) {
                BinaryMetadataHolder holder = e.getValue();

                BinaryMetadataHolder localHolder = new BinaryMetadataHolder(holder.metadata(),
                        holder.pendingVersion(),
                        holder.pendingVersion());

                if (log.isDebugEnabled())
                    log.debug("Received metadata on join: " + localHolder);

                metadataLocCache.put(e.getKey(), localHolder);

                if (!ctx.clientNode())
                    metadataFileStore.writeMetadata(holder.metadata());
            }
        }
    }

    /**
     * Sets path to binary metadata store configured by user, should include binary_meta and consistentId
     * @param binaryMetadataFileStoreDir path to binary_meta
     */
    public void setBinaryMetadataFileStoreDir(@Nullable File binaryMetadataFileStoreDir) {
        this.binaryMetadataFileStoreDir = binaryMetadataFileStoreDir;
    }

    /** {@inheritDoc} */
    @Override public void removeType(int typeId) {
        BinaryMetadataHolder oldHld = metadataLocCache.get(typeId);

        if (oldHld == null)
            throw new IgniteException("Failed to remove metadata, type not found: " + typeId);

        if (oldHld.removing())
            throw new IgniteException("Failed to remove metadata, type is being removed: " + typeId);

        if (!IgniteFeatures.allNodesSupports(ctx.discovery().allNodes(), IgniteFeatures.REMOVE_METADATA)) {
            throw new IgniteException("Failed to remove metadata, " +
                "all cluster nodes must support the remove type feature");
        }

        try {
            GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataRemove(typeId);

            MetadataUpdateResult res = fut.get();

            if (res.rejected())
                throw res.error();
        }
        catch (IgniteCheckedException e) {
            IgniteCheckedException ex = e;

            if (ctx.isStopping()) {
                ex = new NodeStoppingException("Node is stopping.");

                ex.addSuppressed(e);
            }

            throw new BinaryObjectException("Failed to remove metadata for type: " + typeId, ex);
        }
    }

    /** */
    @SuppressWarnings("PublicInnerClass")
    public static class TestBinaryContext extends BinaryContext {
        /** */
        private List<TestBinaryContextListener> listeners;

        /**
         * @param metaHnd Meta handler.
         * @param igniteCfg Ignite config.
         * @param log Logger.
         */
        public TestBinaryContext(BinaryMetadataHandler metaHnd, IgniteConfiguration igniteCfg,
            IgniteLogger log) {
            super(metaHnd, igniteCfg, log);
        }

        /** {@inheritDoc} */
        @Nullable @Override public BinaryType metadata(int typeId) throws BinaryObjectException {
            BinaryType metadata = super.metadata(typeId);

            if (listeners != null) {
                for (TestBinaryContextListener listener : listeners)
                    listener.onAfterMetadataRequest(typeId, metadata);
            }

            return metadata;
        }

        /** {@inheritDoc} */
        @Override public void updateMetadata(int typeId, BinaryMetadata meta,
            boolean failIfUnregistered) throws BinaryObjectException {
            if (listeners != null) {
                for (TestBinaryContextListener listener : listeners)
                    listener.onBeforeMetadataUpdate(typeId, meta);
            }

            super.updateMetadata(typeId, meta, failIfUnregistered);
        }

        /** */
        public interface TestBinaryContextListener {
            /**
             * @param typeId Type id.
             * @param type Type.
             */
            void onAfterMetadataRequest(int typeId, BinaryType type);

            /**
             * @param typeId Type id.
             * @param metadata Metadata.
             */
            void onBeforeMetadataUpdate(int typeId, BinaryMetadata metadata);
        }

        /**
         * @param lsnr Listener.
         */
        public void addListener(TestBinaryContextListener lsnr) {
            if (listeners == null)
                listeners = new ArrayList<>();

            if (!listeners.contains(lsnr))
                listeners.add(lsnr);
        }

        /** */
        public void clearAllListener() {
            if (listeners != null)
                listeners.clear();
        }
    }
}
