/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.client.thin;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.Cache;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.IndexQuery;
import org.apache.ignite.cache.query.IndexQueryCriterion;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.SqlQuery;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.cache.query.RangeIndexQueryCriterion;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.internal.binary.GridBinaryMarshaller.ARR_LIST;
import static org.apache.ignite.internal.client.thin.ProtocolVersionFeature.EXPIRY_POLICY;
import static org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy.convertDuration;

/**
 * Implementation of {@link ClientCache} over TCP protocol.
 */
public class TcpClientCache<K, V> implements ClientCache<K, V> {
    /** "Keep binary" flag mask. */
    private static final byte KEEP_BINARY_FLAG_MASK = 0x01;

    /** "Transactional" flag mask. */
    private static final byte TRANSACTIONAL_FLAG_MASK = 0x02;

    /** "With expiry policy" flag mask. */
    private static final byte WITH_EXPIRY_POLICY_FLAG_MASK = 0x04;

    /** Platform type: Java platform. */
    static final byte JAVA_PLATFORM = 1;

    /** Cache id. */
    private final int cacheId;

    /** Channel. */
    private final ReliableChannel ch;

    /** Cache name. */
    private final String name;

    /** Marshaller. */
    private final ClientBinaryMarshaller marsh;

    /** Transactions facade. */
    private final TcpClientTransactions transactions;

    /** Serializer/deserializer. */
    private final ClientUtils serDes;

    /** Indicates if cache works with Ignite Binary format. */
    private final boolean keepBinary;

    /** Expiry policy. */
    private final ExpiryPolicy expiryPlc;

    /** Cache entry listeners registry. */
    private final ClientCacheEntryListenersRegistry lsnrsRegistry;

    /** JCache adapter. */
    private final Cache<K, V> jCacheAdapter;

    /** Constructor. */
    TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
        ClientCacheEntryListenersRegistry lsnrsRegistry) {
        this(name, ch, marsh, transactions, lsnrsRegistry, false, null);
    }

    /** Constructor. */
    TcpClientCache(String name, ReliableChannel ch, ClientBinaryMarshaller marsh, TcpClientTransactions transactions,
        ClientCacheEntryListenersRegistry lsnrsRegistry, boolean keepBinary, ExpiryPolicy expiryPlc) {
        this.name = name;
        this.cacheId = ClientUtils.cacheId(name);
        this.ch = ch;
        this.marsh = marsh;
        this.transactions = transactions;
        this.lsnrsRegistry = lsnrsRegistry;

        serDes = new ClientUtils(marsh);

        this.keepBinary = keepBinary;
        this.expiryPlc = expiryPlc;

        jCacheAdapter = new ClientJCacheAdapter<>(this);

        this.ch.registerCacheIfCustomAffinity(this.name);
    }

    /** {@inheritDoc} */
    @Override public V get(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_GET,
            null,
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<V> getAsync(K key) {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_GET,
                null,
                this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public void put(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_PUT,
            req -> writeObject(req, val),
            null
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> putAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_PUT,
                req -> writeObject(req, val),
                null
        );
    }

    /** {@inheritDoc} */
    @Override public boolean containsKey(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_CONTAINS_KEY,
            null,
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> containsKeyAsync(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_CONTAINS_KEY,
                null,
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public boolean containsKeys(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return true;

        return ch.service(
            ClientOperation.CACHE_CONTAINS_KEYS,
            req -> writeKeys(keys, req),
            res -> res.in().readBoolean());
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> containsKeysAsync(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return IgniteClientFutureImpl.completedFuture(true);

        return ch.serviceAsync(
            ClientOperation.CACHE_CONTAINS_KEYS,
            req -> writeKeys(keys, req),
            res -> res.in().readBoolean());
    }

    /** {@inheritDoc} */
    @Override public String getName() {
        return name;
    }

    /** {@inheritDoc} */
    @Override public ClientCacheConfiguration getConfiguration() throws ClientException {
        return ch.service(
            ClientOperation.CACHE_GET_CONFIGURATION,
            this::writeCacheInfo,
            this::getClientCacheConfiguration
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<ClientCacheConfiguration> getConfigurationAsync() throws ClientException {
        return ch.serviceAsync(
                ClientOperation.CACHE_GET_CONFIGURATION,
                this::writeCacheInfo,
                this::getClientCacheConfiguration
        );
    }

    /** {@inheritDoc} */
    @Override public int size(CachePeekMode... peekModes) throws ClientException {
        return ch.service(
            ClientOperation.CACHE_GET_SIZE,
            req -> {
                writeCacheInfo(req);
                ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal()));
            },
            res -> (int)res.in().readLong()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws ClientException {
        return ch.serviceAsync(
                ClientOperation.CACHE_GET_SIZE,
                req -> {
                    writeCacheInfo(req);
                    ClientUtils.collection(peekModes, req.out(), (out, m) -> out.writeByte((byte)m.ordinal()));
                },
                res -> (int)res.in().readLong()
        );
    }

    /** {@inheritDoc} */
    @Override public Map<K, V> getAll(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return new HashMap<>();

        return ch.service(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return IgniteClientFutureImpl.completedFuture(new HashMap<>());

        return ch.serviceAsync(ClientOperation.CACHE_GET_ALL, req -> writeKeys(keys, req), this::readEntries);
    }

    /** {@inheritDoc} */
    @Override public void putAll(Map<? extends K, ? extends V> map) throws ClientException {
        if (map == null)
            throw new NullPointerException("map");

        if (map.isEmpty())
            return;

        ch.request(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) throws ClientException {
        return ch.requestAsync(ClientOperation.CACHE_PUT_ALL, req -> writeEntries(map, req));
    }

    /** {@inheritDoc} */
    @Override public boolean replace(K key, V oldVal, V newVal) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (oldVal == null)
            throw new NullPointerException("oldVal");

        if (newVal == null)
            throw new NullPointerException("newVal");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_REPLACE_IF_EQUALS,
            req -> {
                writeObject(req, oldVal);
                writeObject(req, newVal);
            },
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (oldVal == null)
            throw new NullPointerException("oldVal");

        if (newVal == null)
            throw new NullPointerException("newVal");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_REPLACE_IF_EQUALS,
                req -> {
                    writeObject(req, oldVal);
                    writeObject(req, newVal);
                },
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public boolean replace(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_REPLACE,
            req -> writeObject(req, val),
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> replaceAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_REPLACE,
                req -> writeObject(req, val),
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public boolean remove(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_REMOVE_KEY,
            null,
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> removeAsync(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_REMOVE_KEY,
                null,
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public boolean remove(K key, V oldVal) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (oldVal == null)
            throw new NullPointerException("oldVal");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_REMOVE_IF_EQUALS,
            req -> writeObject(req, oldVal),
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> removeAsync(K key, V oldVal) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (oldVal == null)
            throw new NullPointerException("oldVal");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_REMOVE_IF_EQUALS,
                req -> writeObject(req, oldVal),
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public void removeAll(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return;

        ch.request(
            ClientOperation.CACHE_REMOVE_KEYS,
            req -> {
                writeKeys(keys, req);
            }
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> removeAllAsync(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return IgniteClientFutureImpl.completedFuture(null);

        return ch.requestAsync(
                ClientOperation.CACHE_REMOVE_KEYS,
                req -> {
                    writeKeys(keys, req);
                }
        );
    }

    /** {@inheritDoc} */
    @Override public void removeAll() throws ClientException {
        ch.request(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> removeAllAsync() throws ClientException {
        return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL, this::writeCacheInfo);
    }

    /** {@inheritDoc} */
    @Override public V getAndPut(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_GET_AND_PUT,
            req -> writeObject(req, val),
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<V> getAndPutAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_GET_AND_PUT,
                req -> writeObject(req, val),
                this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public V getAndRemove(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_GET_AND_REMOVE,
            null,
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<V> getAndRemoveAsync(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_GET_AND_REMOVE,
                null,
                this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public V getAndReplace(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_GET_AND_REPLACE,
            req -> writeObject(req, val),
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<V> getAndReplaceAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_GET_AND_REPLACE,
                req -> writeObject(req, val),
                this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public boolean putIfAbsent(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_PUT_IF_ABSENT,
            req -> writeObject(req, val),
            res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Boolean> putIfAbsentAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
                key,
                ClientOperation.CACHE_PUT_IF_ABSENT,
                req -> writeObject(req, val),
                res -> res.in().readBoolean()
        );
    }

    /** {@inheritDoc} */
    @Override public V getAndPutIfAbsent(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_GET_AND_PUT_IF_ABSENT,
            req -> writeObject(req, val),
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<V> getAndPutIfAbsentAsync(K key, V val) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        if (val == null)
            throw new NullPointerException("val");

        return cacheSingleKeyOperationAsync(
            key,
            ClientOperation.CACHE_GET_AND_PUT_IF_ABSENT,
            req -> writeObject(req, val),
            this::readObject
        );
    }

    /** {@inheritDoc} */
    @Override public void clear() throws ClientException {
        ch.request(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> clearAsync() throws ClientException {
        return ch.requestAsync(ClientOperation.CACHE_CLEAR, this::writeCacheInfo);
    }

    /** {@inheritDoc} */
    @Override public void clear(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        cacheSingleKeyOperation(
            key,
            ClientOperation.CACHE_CLEAR_KEY,
            null,
            null
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> clearAsync(K key) throws ClientException {
        if (key == null)
            throw new NullPointerException("key");

        return cacheSingleKeyOperationAsync(
            key,
            ClientOperation.CACHE_CLEAR_KEY,
            null,
            null
        );
    }

    /** {@inheritDoc} */
    @Override public void clearAll(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return;

        ch.request(
            ClientOperation.CACHE_CLEAR_KEYS,
            req -> writeKeys(keys, req)
        );
    }

    /** {@inheritDoc} */
    @Override public IgniteClientFuture<Void> clearAllAsync(Set<? extends K> keys) throws ClientException {
        if (keys == null)
            throw new NullPointerException("keys");

        if (keys.isEmpty())
            return IgniteClientFutureImpl.completedFuture(null);

        return ch.requestAsync(
            ClientOperation.CACHE_CLEAR_KEYS,
            req -> writeKeys(keys, req)
        );
    }

    /** {@inheritDoc} */
    @Override public <K1, V1> ClientCache<K1, V1> withKeepBinary() {
        return keepBinary ? (ClientCache<K1, V1>)this :
            new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, true, expiryPlc);
    }

    /** {@inheritDoc} */
    @Override public <K1, V1> ClientCache<K1, V1> withExpirePolicy(ExpiryPolicy expirePlc) {
        return new TcpClientCache<>(name, ch, marsh, transactions, lsnrsRegistry, keepBinary, expirePlc);
    }

    /** {@inheritDoc} */
    @SuppressWarnings("unchecked")
    @Override public <R> QueryCursor<R> query(Query<R> qry) {
        if (qry == null)
            throw new NullPointerException("qry");

        QueryCursor<R> res;

        if (qry instanceof ScanQuery)
            res = scanQuery((ScanQuery)qry);
        else if (qry instanceof SqlQuery)
            res = (QueryCursor<R>)sqlQuery((SqlQuery)qry);
        else if (qry instanceof SqlFieldsQuery)
            res = (QueryCursor<R>)query((SqlFieldsQuery)qry);
        else if (qry instanceof ContinuousQuery)
            res = query((ContinuousQuery<K, V>)qry, null);
        else if (qry instanceof IndexQuery)
            res = indexQuery((IndexQuery)qry);
        else
            throw new IllegalArgumentException(
                String.format("Query of type [%s] is not supported", qry.getClass().getSimpleName())
            );

        return res;
    }

    /** {@inheritDoc} */
    @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
        if (qry == null)
            throw new NullPointerException("qry");

        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
            writeCacheInfo(payloadCh);
            serDes.write(qry, payloadCh.out());
        };

        return new ClientFieldsQueryCursor<>(new ClientFieldsQueryPager(
            ch,
            ClientOperation.QUERY_SQL_FIELDS,
            ClientOperation.QUERY_SQL_FIELDS_CURSOR_GET_PAGE,
            qryWriter,
            keepBinary,
            marsh
        ));
    }

    /** {@inheritDoc} */
    @Override public <R> QueryCursor<R> query(ContinuousQuery<K, V> qry, ClientDisconnectListener disconnectLsnr) {
        A.ensure(!(qry.getInitialQuery() instanceof ContinuousQuery), "Initial query for continuous query " +
            "can't be an instance of another continuous query");
        A.notNull(qry.getLocalListener(), "Local listener");
        A.ensure(!qry.isLocal(), "Local query is not supported by thin client");
        A.ensure(qry.isAutoUnsubscribe(), "AutoUnsubscribe flag is not supported by thin client");
        A.ensure(qry.getRemoteFilterFactory() == null || qry.getRemoteFilter() == null,
            "RemoteFilter and RemoteFilterFactory can't be used together");

        ClientCacheEntryListenerHandler<K, V> hnd = new ClientCacheEntryListenerHandler<>(
            jCacheAdapter,
            ch,
            marsh,
            keepBinary
        );

        hnd.startListen(
            qry.getLocalListener(),
            disconnectLsnr,
            qry.getRemoteFilterFactory() != null ? qry.getRemoteFilterFactory() : qry.getRemoteFilter() != null ?
                FactoryBuilder.factoryOf(qry.getRemoteFilter()) : null,
            qry.getPageSize(),
            qry.getTimeInterval(),
            qry.isIncludeExpired()
        );

        if (qry.getInitialQuery() != null) {
            try {
                QueryCursor<R> cur = (QueryCursor<R>)query(qry.getInitialQuery());

                return new ClientContinuousQueryCursor<>(cur, hnd);
            }
            catch (Exception e) {
                U.closeQuiet(hnd);

                throw e;
            }
        }
        else
            return new ClientContinuousQueryCursor<>(null, hnd);
    }

    /** {@inheritDoc} */
    @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
        registerCacheEntryListener(cfg, null);
    }

    /** {@inheritDoc} */
    @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg,
        ClientDisconnectListener disconnectLsnr) {
        A.ensure(!cfg.isSynchronous(),
            "Unsupported cfg.isSynchronous() flag value");

        A.notNull(cfg.getCacheEntryListenerFactory(), "cfg.getCacheEntryListenerFactory()");

        ClientCacheEntryListenerHandler<K, V> hnd = new ClientCacheEntryListenerHandler<>(
            jCacheAdapter,
            ch,
            marsh,
            keepBinary
        );

        if (lsnrsRegistry.registerCacheEntryListener(name, cfg, hnd)) {
            CacheEntryListener<? super K, ? super V> locLsnr = cfg.getCacheEntryListenerFactory().create();

            ClientDisconnectListener disconnectLsnr0 = e -> {
                if (disconnectLsnr != null)
                    disconnectLsnr.onDisconnected(e);

                lsnrsRegistry.deregisterCacheEntryListener(name, cfg);
            };

            hnd.startListen(
                new ClientJCacheEntryListenerAdapter<>(locLsnr),
                disconnectLsnr0,
                cfg.getCacheEntryEventFilterFactory(),
                ContinuousQuery.DFLT_PAGE_SIZE,
                ContinuousQuery.DFLT_TIME_INTERVAL,
                locLsnr instanceof CacheEntryExpiredListener
            );
        }
        else
            throw new IllegalStateException("Listener is already registered for configuration: " + cfg);
    }

    /** {@inheritDoc} */
    @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> cfg) {
        ClientCacheEntryListenerHandler<?, ?> hnd = lsnrsRegistry.deregisterCacheEntryListener(name, cfg);

        U.closeQuiet(hnd);
    }

    /**
     * Store DR data.
     *
     * @param drMap DR map.
     */
    public void putAllConflict(Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> drMap) throws ClientException {
        A.notNull(drMap, "drMap");

        ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
    }

    /**
     * Store DR data asynchronously.
     *
     * @param drMap DR map.
     * @return Future.
     */
    public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? extends V, GridCacheVersion>> drMap)
        throws ClientException {
        A.notNull(drMap, "drMap");

        return ch.requestAsync(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
    }

    /**
     * Removes DR data.
     *
     * @param drMap DR map.
     */
    public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws ClientException {
        A.notNull(drMap, "drMap");

        ch.request(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
    }

    /**
     * Removes DR data asynchronously.
     *
     * @param drMap DR map.
     * @return Future.
     */
    public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap)
        throws ClientException {
        A.notNull(drMap, "drMap");

        return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
    }

    /** Handle scan query. */
    private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
            writeCacheInfo(payloadCh);

            BinaryOutputStream out = payloadCh.out();

            if (qry.getFilter() == null)
                out.writeByte(GridBinaryMarshaller.NULL);
            else {
                serDes.writeObject(out, qry.getFilter());
                out.writeByte(JAVA_PLATFORM);
            }

            out.writeInt(qry.getPageSize());
            out.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());
            out.writeBoolean(qry.isLocal());
        };

        return new ClientQueryCursor<>(new ClientQueryPager<>(
            ch,
            ClientOperation.QUERY_SCAN,
            ClientOperation.QUERY_SCAN_CURSOR_GET_PAGE,
            qryWriter,
            keepBinary,
            marsh,
            cacheId,
            qry.getPartition() == null ? -1 : qry.getPartition()
        ));
    }

    /** Handle index query. */
    private QueryCursor<Cache.Entry<K, V>> indexQuery(IndexQuery<K, V> qry) {
        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
            writeCacheInfo(payloadCh);

            BinaryOutputStream out = payloadCh.out();

            try (BinaryRawWriterEx w = new BinaryWriterExImpl(marsh.context(), out, null, null)) {
                w.writeInt(qry.getPageSize());
                w.writeBoolean(qry.isLocal());
                w.writeInt(qry.getPartition() == null ? -1 : qry.getPartition());

                w.writeString(qry.getValueType());
                w.writeString(qry.getIndexName());

                if (qry.getCriteria() != null) {
                    out.writeByte(ARR_LIST);
                    out.writeInt(qry.getCriteria().size());

                    for (IndexQueryCriterion c: qry.getCriteria()) {
                        if (c instanceof RangeIndexQueryCriterion) {
                            out.writeByte((byte)0); // Criterion type.

                            RangeIndexQueryCriterion range = (RangeIndexQueryCriterion)c;

                            w.writeString(range.field());
                            w.writeBoolean(range.lowerIncl());
                            w.writeBoolean(range.upperIncl());
                            w.writeBoolean(range.lowerNull());
                            w.writeBoolean(range.upperNull());

                            serDes.writeObject(out, range.lower());
                            serDes.writeObject(out, range.upper());
                        }
                        else {
                            throw new IllegalArgumentException(
                                String.format("Unknown IndexQuery criterion type [%s]", c.getClass().getSimpleName())
                            );
                        }
                    }
                }
                else
                    out.writeByte(GridBinaryMarshaller.NULL);
            }

            if (qry.getFilter() == null)
                out.writeByte(GridBinaryMarshaller.NULL);
            else {
                serDes.writeObject(out, qry.getFilter());
                out.writeByte(JAVA_PLATFORM);
            }
        };

        return new ClientQueryCursor<>(new ClientQueryPager<>(
            ch,
            ClientOperation.QUERY_INDEX,
            ClientOperation.QUERY_INDEX_CURSOR_GET_PAGE,
            qryWriter,
            keepBinary,
            marsh,
            cacheId,
            qry.getPartition() == null ? -1 : qry.getPartition()
        ));
    }

    /** Handle SQL query. */
    private QueryCursor<Cache.Entry<K, V>> sqlQuery(SqlQuery qry) {
        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
            writeCacheInfo(payloadCh);

            BinaryOutputStream out = payloadCh.out();

            serDes.writeObject(out, qry.getType());
            serDes.writeObject(out, qry.getSql());
            ClientUtils.collection(qry.getArgs(), out, serDes::writeObject);
            out.writeBoolean(qry.isDistributedJoins());
            out.writeBoolean(qry.isLocal());
            out.writeBoolean(qry.isReplicatedOnly());
            out.writeInt(qry.getPageSize());
            out.writeLong(qry.getTimeout());
        };

        return new ClientQueryCursor<>(new ClientQueryPager<>(
            ch,
            ClientOperation.QUERY_SQL,
            ClientOperation.QUERY_SQL_CURSOR_GET_PAGE,
            qryWriter,
            keepBinary,
            marsh
        ));
    }

    /**
     * Execute cache operation with a single key.
     */
    private <T> T cacheSingleKeyOperation(
        K key,
        ClientOperation op,
        Consumer<PayloadOutputChannel> additionalPayloadWriter,
        Function<PayloadInputChannel, T> payloadReader
    ) throws ClientException {
        Consumer<PayloadOutputChannel> payloadWriter = req -> {
            writeCacheInfo(req);
            writeObject(req, key);

            if (additionalPayloadWriter != null)
                additionalPayloadWriter.accept(req);
        };

        // Transactional operation cannot be executed on affinity node, it should be executed on node started
        // the transaction.
        return transactions.tx() == null ? ch.affinityService(cacheId, key, op, payloadWriter, payloadReader) :
            ch.service(op, payloadWriter, payloadReader);
    }

    /**
     * Execute cache operation with a single key asynchronously.
     */
    private <T> IgniteClientFuture<T> cacheSingleKeyOperationAsync(
        K key,
        ClientOperation op,
        Consumer<PayloadOutputChannel> additionalPayloadWriter,
        Function<PayloadInputChannel, T> payloadReader
    ) throws ClientException {
        Consumer<PayloadOutputChannel> payloadWriter = req -> {
            writeCacheInfo(req);
            writeObject(req, key);

            if (additionalPayloadWriter != null)
                additionalPayloadWriter.accept(req);
        };

        // Transactional operation cannot be executed on affinity node, it should be executed on node started
        // the transaction.
        return transactions.tx() == null
                ? ch.affinityServiceAsync(cacheId, key, op, payloadWriter, payloadReader)
                : ch.serviceAsync(op, payloadWriter, payloadReader);
    }

    /** Write cache ID and flags. */
    private void writeCacheInfo(PayloadOutputChannel payloadCh) {
        BinaryOutputStream out = payloadCh.out();

        out.writeInt(cacheId);

        byte flags = keepBinary ? KEEP_BINARY_FLAG_MASK : 0;

        TcpClientTransaction tx = transactions.tx();

        if (expiryPlc != null) {
            ProtocolContext protocolCtx = payloadCh.clientChannel().protocolCtx();

            if (!protocolCtx.isFeatureSupported(EXPIRY_POLICY)) {
                throw new ClientProtocolError(String.format("Expire policies are not supported by the server " +
                    "version %s, required version %s", protocolCtx.version(), EXPIRY_POLICY.verIntroduced()));
            }

            flags |= WITH_EXPIRY_POLICY_FLAG_MASK;
        }

        if (tx != null) {
            if (tx.clientChannel() != payloadCh.clientChannel()) {
                throw new ClientException("Transaction context has been lost due to connection errors. " +
                    "Cache operations are prohibited until current transaction closed.");
            }

            flags |= TRANSACTIONAL_FLAG_MASK;
        }

        out.writeByte(flags);

        if ((flags & WITH_EXPIRY_POLICY_FLAG_MASK) != 0) {
            out.writeLong(convertDuration(expiryPlc.getExpiryForCreation()));
            out.writeLong(convertDuration(expiryPlc.getExpiryForUpdate()));
            out.writeLong(convertDuration(expiryPlc.getExpiryForAccess()));
        }

        if ((flags & TRANSACTIONAL_FLAG_MASK) != 0)
            out.writeInt(tx.txId());
    }

    /** */
    private <T> T readObject(BinaryInputStream in) {
        return serDes.readObject(in, keepBinary);
    }

    /** */
    private <T> T readObject(PayloadInputChannel payloadCh) {
        return readObject(payloadCh.in());
    }

    /** */
    private void writeObject(PayloadOutputChannel payloadCh, Object obj) {
        serDes.writeObject(payloadCh.out(), obj);
    }

    /** */
    @Nullable private ClientCacheConfiguration getClientCacheConfiguration(PayloadInputChannel res) {
        try {
            return serDes.cacheConfiguration(res.in(), res.clientChannel().protocolCtx());
        }
        catch (IOException e) {
            return null;
        }
    }

    /** */
    private void writeKeys(Set<? extends K> keys, PayloadOutputChannel req) {
        writeCacheInfo(req);
        ClientUtils.collection(keys, req.out(), serDes::writeObject);
    }

    /** */
    private Map<K, V> readEntries(PayloadInputChannel res) {
        BinaryInputStream in = res.in();

        int cnt = in.readInt();
        Map<K, V> map = new HashMap<>();

        for (int i = 0; i < cnt; i++)
            map.put(readObject(in), readObject(in));

        return map;
    }

    /** */
    private void writeEntries(Map<? extends K, ? extends V> map, PayloadOutputChannel req) {
        writeCacheInfo(req);
        ClientUtils.collection(
                map.entrySet(),
                req.out(),
                (out, e) -> {
                    serDes.writeObject(out, e.getKey());
                    serDes.writeObject(out, e.getValue());
                });
    }

    /** */
    private void writePutAllConflict(
        Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
        PayloadOutputChannel req
    ) {
        checkDataReplicationSupported(req.clientChannel().protocolCtx());

        writeCacheInfo(req);

        ClientUtils.collection(
            map.entrySet(),
            req.out(),
            (out, e) -> {
                serDes.writeObject(out, e.getKey());
                serDes.writeObject(out, e.getValue().get1());
                serDes.writeObject(out, e.getValue().get2());
            });
    }

    /** */
    private void writeRemoveAllConflict(Map<? extends K, GridCacheVersion> map, PayloadOutputChannel req) {
        checkDataReplicationSupported(req.clientChannel().protocolCtx());

        writeCacheInfo(req);

        ClientUtils.collection(
            map.entrySet(),
            req.out(),
            (out, e) -> {
                serDes.writeObject(out, e.getKey());
                serDes.writeObject(out, e.getValue());
            });
    }

    /**
     * Check that data replication operations is supported by server.
     *
     * @param protocolCtx Protocol context.
     */
    private void checkDataReplicationSupported(ProtocolContext protocolCtx)
        throws ClientFeatureNotSupportedByServerException {
        if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
            throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
    }
}
