/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.client;

import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.cache.expiry.AccessedExpiryPolicy;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ModifiedExpiryPolicy;
import com.google.common.collect.ImmutableSet;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheKeyConfiguration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.QueryIndex;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.AbstractBinaryArraysTest;
import org.apache.ignite.internal.client.thin.ClientServerError;
import org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.spi.systemview.view.SystemView;
import org.apache.ignite.spi.systemview.view.TransactionView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL1;
import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL2;
import static org.apache.ignite.internal.processors.cache.CacheEnumOperationsAbstractTest.TestEnum.VAL3;
import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.TXS_MON_LIST;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
import static org.junit.Assert.assertArrayEquals;

/**
 * Thin client functional tests.
 */
public class FunctionalTest extends AbstractBinaryArraysTest {
    /** Per test timeout */
    @SuppressWarnings("deprecation")
    @Rule
    public Timeout globalTimeout = new Timeout((int)GridTestUtils.DFLT_TEST_TIMEOUT);

    /**
     * Tested API:
     * <ul>
     * <li>{@link IgniteClient#cache(String)}</li>
     * <li>{@link IgniteClient#getOrCreateCache(ClientCacheConfiguration)}</li>
     * <li>{@link IgniteClient#cacheNames()}</li>
     * <li>{@link IgniteClient#createCache(String)}</li>
     * <li>{@link IgniteClient#createCache(ClientCacheConfiguration)}</li>
     * <li>{@link IgniteCache#size(CachePeekMode...)}</li>
     * </ul>
     */
    @Test
    public void testCacheManagement() throws Exception {
        try (LocalIgniteCluster ignored = LocalIgniteCluster.start(2);
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            final String CACHE_NAME = "testCacheManagement";

            ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration().setName(CACHE_NAME)
                .setCacheMode(CacheMode.REPLICATED)
                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

            int key = 1;
            Person val = new Person(key, Integer.toString(key));

            ClientCache<Integer, Person> cache = client.getOrCreateCache(cacheCfg);

            cache.put(key, val);

            assertEquals(1, cache.size());
            assertEquals(2, cache.size(CachePeekMode.ALL));

            cache = client.cache(CACHE_NAME);

            Person cachedVal = cache.get(key);

            assertEquals(val, cachedVal);

            Object[] cacheNames = new TreeSet<>(client.cacheNames()).toArray();

            assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);

            client.destroyCache(CACHE_NAME);

            cacheNames = client.cacheNames().toArray();

            assertArrayEquals(new Object[] {Config.DEFAULT_CACHE_NAME}, cacheNames);

            cache = client.createCache(CACHE_NAME);

            assertFalse(cache.containsKey(key));

            cacheNames = client.cacheNames().toArray();

            assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);

            client.destroyCache(CACHE_NAME);

            cache = client.createCache(cacheCfg);

            assertFalse(cache.containsKey(key));

            assertArrayEquals(new TreeSet<>(Arrays.asList(Config.DEFAULT_CACHE_NAME, CACHE_NAME)).toArray(), cacheNames);
        }
    }

    /**
     * Tested API:
     * <ul>
     * <li>{@link ClientCache#getName()}</li>
     * <li>{@link ClientCache#getConfiguration()}</li>
     * </ul>
     */
    @Test
    public void testCacheConfiguration() throws Exception {
        final String dataRegionName = "functional-test-data-region";

        IgniteConfiguration cfg = Config.getServerConfiguration()
            .setDataStorageConfiguration(new DataStorageConfiguration()
                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                    .setName(dataRegionName)));

        try (Ignite ignored = Ignition.start(cfg);
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            final String CACHE_NAME = "testCacheConfiguration";

            ClientCacheConfiguration cacheCfgTemplate = new ClientCacheConfiguration().setName(CACHE_NAME)
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
                .setBackups(3)
                .setCacheMode(CacheMode.PARTITIONED)
                .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
                .setEagerTtl(false)
                .setGroupName("FunctionalTest")
                .setDefaultLockTimeout(12345)
                .setPartitionLossPolicy(PartitionLossPolicy.READ_WRITE_SAFE)
                .setReadFromBackup(true)
                .setRebalanceBatchSize(67890)
                .setRebalanceBatchesPrefetchCount(102938)
                .setRebalanceDelay(54321)
                .setRebalanceMode(CacheRebalanceMode.SYNC)
                .setRebalanceOrder(2)
                .setRebalanceThrottle(564738)
                .setRebalanceTimeout(142536)
                .setKeyConfiguration(new CacheKeyConfiguration("Employee", "orgId"))
                .setQueryEntities(new QueryEntity(int.class.getName(), "Employee")
                    .setTableName("EMPLOYEE")
                    .setFields(
                        Stream.of(
                            new SimpleEntry<>("id", Integer.class.getName()),
                            new SimpleEntry<>("orgId", Integer.class.getName())
                        ).collect(Collectors.toMap(
                            SimpleEntry::getKey, SimpleEntry::getValue, (a, b) -> a, LinkedHashMap::new
                        ))
                    )
                    // During query normalization null keyFields become empty set.
                    // Set empty collection for comparator.
                    .setKeyFields(Collections.emptySet())
                    .setKeyFieldName("id")
                    .setNotNullFields(Collections.singleton("id"))
                    .setDefaultFieldValues(Collections.singletonMap("id", 0))
                    .setIndexes(Collections.singletonList(new QueryIndex("id", true, "IDX_EMPLOYEE_ID")))
                    .setAliases(Stream.of("id", "orgId").collect(Collectors.toMap(f -> f, String::toUpperCase)))
                )
                .setExpiryPolicy(new PlatformExpiryPolicy(10, 20, 30))
                .setCopyOnRead(!CacheConfiguration.DFLT_COPY_ON_READ)
                .setDataRegionName(dataRegionName)
                .setMaxConcurrentAsyncOperations(4)
                .setMaxQueryIteratorsCount(4)
                .setOnheapCacheEnabled(true)
                .setQueryDetailMetricsSize(1024)
                .setQueryParallelism(4)
                .setSqlEscapeAll(true)
                .setSqlIndexMaxInlineSize(1024)
                .setSqlSchema("functional-test-schema")
                .setStatisticsEnabled(true);

            ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration(cacheCfgTemplate);

            ClientCache<Object, Object> cache = client.createCache(cacheCfg);

            assertEquals(CACHE_NAME, cache.getName());

            assertTrue(Comparers.equal(cacheCfgTemplate, cache.getConfiguration()));
        }
    }

    /**
     * Tested API:
     * <ul>
     * <li>{@link Ignition#startClient(ClientConfiguration)}</li>
     * <li>{@link IgniteClient#getOrCreateCache(String)}</li>
     * <li>{@link ClientCache#put(Object, Object)}</li>
     * <li>{@link ClientCache#get(Object)}</li>
     * <li>{@link ClientCache#containsKey(Object)}</li>
     * <li>{@link ClientCache#clear(Object)}</li>
     * </ul>
     */
    @Test
    public void testPutGet() throws Exception {
        // Existing cache, primitive key and object value
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, Person> cache = client.getOrCreateCache(Config.DEFAULT_CACHE_NAME);

            Integer key = 1;
            Person val = new Person(key, "Joe");

            cache.put(key, val);

            assertTrue(cache.containsKey(key));

            Person cachedVal = cache.get(key);

            assertEquals(val, cachedVal);

            cache.clear(key);

            assertFalse(cache.containsKey(key));

            assertNull(cache.get(key));
        }

        // Non-existing cache, object key and primitive value
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Person, Integer> cache = client.getOrCreateCache("testPutGet");

            Integer val = 1;

            Person key = new Person(val, "Joe");

            cache.put(key, val);

            Integer cachedVal = cache.get(key);

            assertEquals(val, cachedVal);

            cache.clear(key);

            assertFalse(cache.containsKey(key));

            assertNull(cache.get(key));
        }

        // Object key and Object value
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Person, Person> cache = client.getOrCreateCache("testPutGet");

            Person key = new Person(1, "Joe Key");

            Person val = new Person(1, "Joe Value");

            cache.put(key, val);

            Person cachedVal = cache.get(key);

            assertEquals(val, cachedVal);

            cache.clear(key);

            assertFalse(cache.containsKey(key));

            assertNull(cache.get(key));
        }
    }

    /**
     * Test cache operations with different data types.
     */
    @Test
    public void testDataTypes() throws Exception {
        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ignite.getOrCreateCache(Config.DEFAULT_CACHE_NAME);

            Person person = new Person(1, "name");

            // Primitive and built-in types.
            checkDataType(client, ignite, (byte)1);
            checkDataType(client, ignite, (short)1);
            checkDataType(client, ignite, 1);
            checkDataType(client, ignite, 1L);
            checkDataType(client, ignite, 1.0f);
            checkDataType(client, ignite, 1.0d);
            checkDataType(client, ignite, 'c');
            checkDataType(client, ignite, true);
            checkDataType(client, ignite, "string");
            checkDataType(client, ignite, UUID.randomUUID());
            checkDataType(client, ignite, new Date());

            // Enum.
            checkDataType(client, ignite, VAL1);

            // Binary object.
            checkDataType(client, ignite, person);

            // Arrays.
            checkDataType(client, ignite, new byte[] {(byte)1});
            checkDataType(client, ignite, new short[] {(short)1});
            checkDataType(client, ignite, new int[] {1});
            checkDataType(client, ignite, new long[] {1L});
            checkDataType(client, ignite, new float[] {1.0f});
            checkDataType(client, ignite, new double[] {1.0d});
            checkDataType(client, ignite, new char[] {'c'});
            checkDataType(client, ignite, new boolean[] {true});
            checkDataType(client, ignite, new String[] {"string"});
            checkDataType(client, ignite, new UUID[] {UUID.randomUUID()});
            checkDataType(client, ignite, new Date[] {new Date()});
            checkDataType(client, ignite, new int[][] {new int[] {1}});

            checkDataType(client, ignite, new TestEnum[] {VAL1, VAL2, VAL3});

            checkDataType(client, ignite, new Person[] {person});
            checkDataType(client, ignite, new Person[][] {new Person[] {person}});
            checkDataType(client, ignite, new Object[] {1, "string", person, new Person[] {person}});

            // Lists.
            checkDataType(client, ignite, Collections.emptyList());
            checkDataType(client, ignite, Collections.singletonList(person));
            checkDataType(client, ignite, Arrays.asList(person, person));
            checkDataType(client, ignite, new ArrayList<>(Arrays.asList(person, person)));
            checkDataType(client, ignite, new LinkedList<>(Arrays.asList(person, person)));
            checkDataType(client, ignite, Arrays.asList(Arrays.asList(person, person), person));

            // Sets.
            checkDataType(client, ignite, Collections.emptySet());
            checkDataType(client, ignite, Collections.singleton(person));
            checkDataType(client, ignite, new HashSet<>(Arrays.asList(1, 2)));
            checkDataType(client, ignite, new HashSet<>(Arrays.asList(Arrays.asList(person, person), person)));
            checkDataType(client, ignite, new HashSet<>(new ArrayList<>(Arrays.asList(Arrays.asList(person,
                person), person))));

            // Maps.
            checkDataType(client, ignite, Collections.emptyMap());
            checkDataType(client, ignite, Collections.singletonMap(1, person));
            checkDataType(client, ignite, F.asMap(1, person));
            checkDataType(client, ignite, new HashMap<>(F.asMap(1, person)));
            checkDataType(client, ignite, new HashMap<>(F.asMap(new HashSet<>(Arrays.asList(1, 2)),
                Arrays.asList(person, person))));
        }
    }

    /**
     * Check that we get the same value from the cache as we put before.
     *
     * @param client Thin client.
     * @param ignite Ignite node.
     * @param obj Value of data type to check.
     */
    private void checkDataType(IgniteClient client, Ignite ignite, Object obj) {
        IgniteCache<Object, Object> thickCache = ignite.cache(Config.DEFAULT_CACHE_NAME);
        ClientCache<Object, Object> thinCache = client.cache(Config.DEFAULT_CACHE_NAME);

        Integer key = 1;

        thinCache.put(key, obj);

        assertTrue(thinCache.containsKey(key));

        Object cachedObj = thinCache.get(key);

        assertEqualsArraysAware(obj, cachedObj);

        assertEqualsArraysAware(obj, thickCache.get(key));

        assertEquals(client.binary().typeId(obj.getClass().getName()), ignite.binary().typeId(obj.getClass().getName()));

        // Server-side comparison with the original object.
        assertTrue(thinCache.replace(key, obj, obj));

        // Server-side comparison with the restored object.
        assertTrue(thinCache.remove(key, cachedObj));
    }

    /**
     * Tested API:
     * <ul>
     * <li>{@link ClientCache#putAll(Map)}</li>
     * <li>{@link ClientCache#getAll(Set)}</li>
     * <li>{@link ClientCache#containsKeys(Set)} (Set)}</li>
     * <li>{@link ClientCache#clear()}</li>
     * <li>{@link ClientCache#clearAll(Set)} ()}</li>
     * </ul>
     */
    @Test
    public void testBatchPutGet() throws Exception {
        // Existing cache, primitive key and object value
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, Person> cache = client.cache(Config.DEFAULT_CACHE_NAME);

            Map<Integer, Person> data = IntStream
                .rangeClosed(1, 1000).boxed()
                .collect(Collectors.toMap(i -> i, i -> new Person(i, String.format("Person %s", i))));

            assertFalse(cache.containsKeys(data.keySet()));

            cache.putAll(data);

            assertTrue(cache.containsKeys(data.keySet()));

            Map<Integer, Person> cachedData = cache.getAll(data.keySet());

            assertEquals(data, cachedData);
        }

        // Non-existing cache, object key and primitive value
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Person, Integer> cache = client.createCache("testBatchPutGet");

            Map<Person, Integer> data = IntStream
                .rangeClosed(1, 1000).boxed()
                .collect(Collectors.toMap(i -> new Person(i, String.format("Person %s", i)), i -> i));

            assertFalse(cache.containsKeys(data.keySet()));

            cache.putAll(data);

            assertTrue(cache.containsKeys(data.keySet()));

            Map<Person, Integer> cachedData = cache.getAll(data.keySet());

            assertEquals(data, cachedData);

            Set<Person> clearKeys = new HashSet<>();

            Iterator<Person> keyIter = data.keySet().iterator();

            for (int i = 0; i < 100; i++)
                clearKeys.add(keyIter.next());

            cache.clearAll(clearKeys);

            assertFalse(cache.containsKeys(clearKeys));
            assertTrue(cache.containsKeys(
                data.keySet().stream().filter(key -> !data.containsKey(key)).collect(Collectors.toSet())));
            assertEquals(data.size() - clearKeys.size(), cache.size(CachePeekMode.ALL));

            cache.clear();

            assertFalse(cache.containsKeys(data.keySet()));
            assertEquals(0, cache.size(CachePeekMode.ALL));
        }
    }

    /**
     * Tested API:
     * <ul>
     * <li>{@link ClientCache#getAndPut(Object, Object)}</li>
     * <li>{@link ClientCache#getAndRemove(Object)}</li>
     * <li>{@link ClientCache#getAndReplace(Object, Object)}</li>
     * <li>{@link ClientCache#putIfAbsent(Object, Object)}</li>
     * <li>{@link ClientCache#getAndPutIfAbsent(Object, Object)}</li>
     * </ul>
     */
    @Test
    public void testAtomicPutGet() throws Exception {
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache("testRemoveReplace");

            assertNull(cache.getAndPut(1, "1"));
            assertEquals("1", cache.getAndPut(1, "1.1"));

            assertEquals("1.1", cache.getAndRemove(1));
            assertNull(cache.getAndRemove(1));

            assertTrue(cache.putIfAbsent(1, "1"));
            assertFalse(cache.putIfAbsent(1, "1.1"));

            assertEquals("1", cache.getAndReplace(1, "1.1"));
            assertEquals("1.1", cache.getAndReplace(1, "1"));
            assertNull(cache.getAndReplace(2, "2"));

            assertEquals("1", cache.getAndPutIfAbsent(1, "1.1"));
            assertEquals("1", cache.get(1));
            assertNull(cache.getAndPutIfAbsent(3, "3"));
            assertEquals("3", cache.get(3));
        }
    }

    /**
     * Tested API:
     * <ul>
     * <li>{@link ClientCache#replace(Object, Object)}</li>
     * <li>{@link ClientCache#replace(Object, Object, Object)}</li>
     * <li>{@link ClientCache#remove(Object)}</li>
     * <li>{@link ClientCache#remove(Object, Object)}</li>
     * <li>{@link ClientCache#removeAll()}</li>
     * <li>{@link ClientCache#removeAll(Set)}</li>
     * </ul>
     */
    @Test
    public void testRemoveReplace() throws Exception {
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache("testRemoveReplace");

            Map<Integer, String> data = IntStream.rangeClosed(1, 100).boxed()
                .collect(Collectors.toMap(i -> i, Object::toString));

            cache.putAll(data);

            assertFalse(cache.replace(1, "2", "3"));
            assertEquals("1", cache.get(1));
            assertTrue(cache.replace(1, "1", "3"));
            assertEquals("3", cache.get(1));

            assertFalse(cache.replace(101, "101"));
            assertNull(cache.get(101));
            assertTrue(cache.replace(100, "101"));
            assertEquals("101", cache.get(100));

            assertFalse(cache.remove(101));
            assertTrue(cache.remove(100));
            assertNull(cache.get(100));

            assertFalse(cache.remove(99, "100"));
            assertEquals("99", cache.get(99));
            assertTrue(cache.remove(99, "99"));
            assertNull(cache.get(99));

            cache.put(101, "101");

            cache.removeAll(data.keySet());
            assertEquals(1, cache.size());
            assertEquals("101", cache.get(101));

            cache.removeAll();
            assertEquals(0, cache.size());
        }
    }

    /**
     * Test client fails on start if server is unavailable
     */
    @Test
    public void testClientFailsOnStart() {
        ClientConnectionException expEx = null;

        //noinspection EmptyTryBlock
        try (IgniteClient ignored = Ignition.startClient(getClientConfiguration())) {
            // No-op.
        }
        catch (ClientConnectionException connEx) {
            expEx = connEx;
        }
        catch (Exception ex) {
            fail(String.format(
                "%s expected but %s was received: %s",
                ClientConnectionException.class.getName(),
                ex.getClass().getName(),
                ex
            ));
        }

        assertNotNull(
            String.format("%s expected but no exception was received", ClientConnectionException.class.getName()),
            expEx
        );
    }


    /**
     * Test PESSIMISTIC REPEATABLE_READ tx holds lock and other tx should be timed out.
     */
    @Test
    public void testPessimisticRepeatableReadsTransactionHoldsLock() throws Exception {
        testPessimisticTxLocking(REPEATABLE_READ);
    }

    /**
     * Test PESSIMISTIC SERIALIZABLE tx holds lock and other tx should be timed out.
     */
    @Test
    public void testPessimisticSerializableTransactionHoldsLock() throws Exception {
        testPessimisticTxLocking(SERIALIZABLE);
    }

    /**
     * Test pessimistic tx holds the lock.
     */
    private void testPessimisticTxLocking(TransactionIsolation isolation) throws Exception {
        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                    .setName("cache")
                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );
            cache.put(0, "value0");

            IgniteInternalFuture<?> fut;

            try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, isolation)) {
                assertEquals("value0", cache.get(0));

                CyclicBarrier barrier = new CyclicBarrier(2);

                fut = GridTestUtils.runAsync(() -> {
                    try (ClientTransaction tx2 = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ, 500)) {
                        cache.put(0, "value2");
                        tx2.commit();
                    }
                    finally {
                        try {
                            barrier.await(2000, TimeUnit.MILLISECONDS);
                        }
                        catch (Throwable ignore) {
                            // No-op.
                        }
                    }
                });

                barrier.await(2000, TimeUnit.MILLISECONDS);

                tx.commit();
            }

            assertEquals("value0", cache.get(0));

            assertThrowsAnyCause(null, fut::get, ClientException.class,
                    "Failed to acquire lock within provided timeout");
        }
    }

    /**
     * Test OPTIMISTIC SERIALIZABLE tx rolls backs if another TX commits.
     */
    @Test
    public void testOptimitsticSerializableTransactionHoldsLock() throws Exception {
        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                    .setName("cache")
                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );
            cache.put(0, "value0");

            final CountDownLatch latch = new CountDownLatch(1);

            try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
                assertEquals("value0", cache.get(0));

                IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
                    try (ClientTransaction tx2 = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
                        cache.put(0, "value2");
                        tx2.commit();
                    }
                    finally {
                        latch.countDown();
                    }
                });

                latch.await();

                cache.put(0, "value1");

                fut.get();

                assertThrowsAnyCause(null, () -> {
                    tx.commit();
                    return null;
                }, ClientException.class, "read/write conflict");
            }

            assertEquals("value2", cache.get(0));
        }
    }

    /**
     * Test OPTIMISTIC REPEATABLE_READ tx doesn't conflict with a regular cache put.
     */
    @Test
    public void testOptimitsticRepeatableReadUpdatesValue() throws Exception {
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                    .setName("cache")
                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );
            cache.put(0, "value0");

            try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) {
                assertEquals("value0", cache.get(0));

                cache.put(0, "value1");

                GridTestUtils.runAsync(() -> {
                    assertEquals("value0", cache.get(0));

                    cache.put(0, "value2");

                    assertEquals("value2", cache.get(0));
                }).get();

                tx.commit();
            }

            assertEquals("value1", cache.get(0));
        }
    }

    /**
     * Test that client-connector worker can process further transactional requests (resume transactions) after
     * external termination of previous transaction.
     */
    @Test
    public void testTxResumeAfterTxTimeout() throws Exception {
        IgniteConfiguration cfg = Config.getServerConfiguration().setClientConnectorConfiguration(
            new ClientConnectorConfiguration().setThreadPoolSize(1));

        try (Ignite ignite = Ignition.start(cfg); IgniteClient client = Ignition.startClient(getClientConfiguration())) {
            String cacheName = "cache";

            IgniteCache<Object, Object> igniteCache = ignite.createCache(new CacheConfiguration<>(cacheName)
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));

            try (ClientTransaction clientTx = client.transactions().txStart()) {
                runAsync(() -> {
                    try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                        igniteCache.put(0, 0); // Lock key by ignite node.

                        try {
                            // Start, but don't close the transaction (to keep it in the threadMap after timeout).
                            client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 200L);

                            // Wait until transaction interrupted externally by timeout.
                            client.cache(cacheName).put(0, 0);

                            fail();
                        }
                        catch (ClientException ignored) {
                            // Expected.
                        }
                    }
                }).get();

                // Resume tx in the worker with interrupted transaction.
                assertFalse(client.cache(cacheName).containsKey(0));
            }
        }
    }

    /**
     * Test transactions.
     */
    @Test
    public void testTransactions() throws Exception {
        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                .setName("cache")
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );

            cache.put(0, "value0");
            cache.put(1, "value1");

            // Test nested transactions is not possible.
            try (ClientTransaction tx = client.transactions().txStart()) {
                try (ClientTransaction tx1 = client.transactions().txStart()) {
                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }
            }

            // Test implicit rollback when transaction closed.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.put(1, "value2");
            }

            assertEquals("value1", cache.get(1));

            // Test explicit rollback.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.put(1, "value2");

                tx.rollback();
            }

            assertEquals("value1", cache.get(1));

            // Test commit.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.put(1, "value2");

                tx.commit();
            }

            assertEquals("value2", cache.get(1));

            // Test end of already completed transaction.
            ClientTransaction tx0 = client.transactions().txStart();

            tx0.close();

            try {
                tx0.commit();

                fail();
            }
            catch (ClientException expected) {
                // No-op.
            }

            // Test end of outdated transaction.
            try (ClientTransaction tx = client.transactions().txStart()) {
                try {
                    tx0.commit();

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                tx.commit();
            }

            // Test transaction with a timeout.
            long TX_TIMEOUT = 200L;

            try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, TX_TIMEOUT)) {
                long txStartedTime = U.currentTimeMillis();

                cache.put(1, "value3");

                while (txStartedTime + TX_TIMEOUT >= U.currentTimeMillis())
                    U.sleep(100L);

                try {
                    cache.put(1, "value4");

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                try {
                    tx.commit();

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }
            }

            assertEquals("value2", cache.get(1));

            cache.put(1, "value5");

            // Test failover.
            ClientProcessorMXBean mxBean = getMxBean(ignite.name(), "Clients",
                ClientListenerProcessor.class, ClientProcessorMXBean.class);

            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.put(1, "value6");

                mxBean.dropAllConnections();

                try {
                    cache.put(1, "value7");

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                // Start new transaction doesn't recover cache operations on failed channel.
                try (ClientTransaction tx1 = client.transactions().txStart()) {
                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                try {
                    cache.get(1);

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                // Close outdated transaction doesn't recover cache operations on failed channel.
                tx0.close();

                try {
                    cache.get(1);

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }
            }

            assertEquals("value5", cache.get(1));

            // Test concurrent transactions in different connections.
            try (IgniteClient client1 = Ignition.startClient(getClientConfiguration())) {
                ClientCache<Integer, String> cache1 = client1.cache("cache");

                try (ClientTransaction tx = client.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
                    cache.put(0, "value8");

                    try (ClientTransaction tx1 = client1.transactions().txStart(OPTIMISTIC, READ_COMMITTED)) {
                        assertEquals("value8", cache.get(0));
                        assertEquals("value0", cache1.get(0));

                        cache1.put(1, "value9");

                        assertEquals("value5", cache.get(1));
                        assertEquals("value9", cache1.get(1));

                        tx1.commit();

                        assertEquals("value9", cache.get(1));
                    }

                    assertEquals("value0", cache1.get(0));

                    tx.commit();

                    assertEquals("value8", cache1.get(0));
                }
            }

            // Check different types of cache operations.
            try (ClientTransaction tx = client.transactions().txStart()) {
                // Operations: put, putAll, putIfAbsent.
                cache.put(2, "value10");
                cache.putAll(F.asMap(1, "value11", 3, "value12"));
                cache.putIfAbsent(4, "value13");

                // Operations: get, getAll, getAndPut, getAndRemove, getAndReplace, getAndPutIfAbsent.
                assertEquals("value10", cache.get(2));
                assertEquals(F.asMap(1, "value11", 2, "value10"),
                    cache.getAll(new HashSet<>(Arrays.asList(1, 2))));
                assertEquals("value13", cache.getAndPut(4, "value14"));
                assertEquals("value14", cache.getAndPutIfAbsent(4, "valueDiscarded"));
                assertEquals("value14", cache.get(4));
                assertEquals("value14", cache.getAndReplace(4, "value15"));
                assertEquals("value15", cache.getAndRemove(4));
                assertNull(cache.getAndPutIfAbsent(10, "valuePutIfAbsent"));
                assertEquals("valuePutIfAbsent", cache.get(10));

                // Operations: contains.
                assertTrue(cache.containsKey(2));
                assertFalse(cache.containsKey(4));
                assertTrue(cache.containsKeys(ImmutableSet.of(2, 10)));
                assertFalse(cache.containsKeys(ImmutableSet.of(2, 4)));

                // Operations: replace.
                cache.put(4, "");
                assertTrue(cache.replace(4, "value16"));
                assertTrue(cache.replace(4, "value16", "value17"));

                // Operations: remove, removeAll
                cache.putAll(F.asMap(5, "", 6, ""));
                assertTrue(cache.remove(5));
                assertTrue(cache.remove(4, "value17"));
                cache.removeAll(new HashSet<>(Arrays.asList(3, 6)));
                assertFalse(cache.containsKey(3));
                assertFalse(cache.containsKey(6));

                tx.rollback();
            }

            assertEquals(F.asMap(0, "value8", 1, "value9"),
                cache.getAll(new HashSet<>(Arrays.asList(0, 1))));
            assertFalse(cache.containsKey(2));

            // Test concurrent transactions started by different threads.
            try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                CyclicBarrier barrier = new CyclicBarrier(2);

                cache.put(0, "value18");

                IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
                    try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                        cache.put(1, "value19");

                        barrier.await();

                        assertEquals("value8", cache.get(0));

                        barrier.await();

                        tx1.commit();

                        barrier.await();

                        assertEquals("value18", cache.get(0));
                    }
                    catch (InterruptedException | BrokenBarrierException ignore) {
                        // No-op.
                    }
                });

                barrier.await();

                assertEquals("value9", cache.get(1));

                barrier.await();

                tx.commit();

                barrier.await();

                assertEquals("value19", cache.get(1));

                fut.get();
            }

            // Test transaction usage by different threads.
            try (ClientTransaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                cache.put(0, "value20");

                GridTestUtils.runAsync(() -> {
                    // Implicit transaction started here.
                    cache.put(1, "value21");

                    assertEquals("value18", cache.get(0));

                    try {
                        // Transaction can't be commited by another thread.
                        tx.commit();

                        fail();
                    }
                    catch (ClientException expected) {
                        // No-op.
                    }

                    // Transaction can be closed by another thread.
                    tx.close();

                    assertEquals("value18", cache.get(0));
                }).get();

                assertEquals("value21", cache.get(1));

                try {
                    // Transaction can't be commited after another thread close this transaction.
                    tx.commit();

                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                assertEquals("value18", cache.get(0));

                // Start implicit transaction after explicit transaction has been closed by another thread.
                cache.put(0, "value22");

                GridTestUtils.runAsync(() -> assertEquals("value22", cache.get(0))).get();

                // New explicit transaction can be started after current transaction has been closed by another thread.
                try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                    cache.put(0, "value23");

                    tx1.commit();
                }

                assertEquals("value23", cache.get(0));
            }

            // Test active transactions limit.
            int txLimit = ignite.configuration().getClientConnectorConfiguration().getThinClientConfiguration()
                .getMaxActiveTxPerConnection();

            List<ClientTransaction> txs = new ArrayList<>(txLimit);

            for (int i = 0; i < txLimit; i++) {
                Thread t = new Thread(() -> txs.add(client.transactions().txStart()));

                t.start();

                t.join();
            }

            try (ClientTransaction ignored = client.transactions().txStart()) {
                fail();
            }
            catch (ClientException e) {
                ClientServerError cause = (ClientServerError)e.getCause();
                assertEquals(ClientStatus.TX_LIMIT_EXCEEDED, cause.getCode());
            }

            for (ClientTransaction tx : txs)
                tx.close();

            // Test that new transaction can be started after commit of the previous one without closing.
            ClientTransaction tx = client.transactions().txStart();
            tx.commit();

            tx = client.transactions().txStart();
            tx.rollback();

            // Test that new transaction can be started after rollback of the previous one without closing.
            tx = client.transactions().txStart();
            tx.commit();

            // Test that implicit transaction started after commit of previous one without closing.
            cache.put(0, "value24");

            GridTestUtils.runAsync(() -> assertEquals("value24", cache.get(0))).get();
        }
    }

    /**
     * Test transactions.
     */
    @Test
    public void testTransactionsAsync() throws Exception {
        try (Ignite ignored = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                    .setName("cache")
                    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );

            cache.put(0, "value0");
            cache.put(1, "value1");

            // Test implicit rollback when transaction closed.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.putAsync(1, "value2").get();
            }

            assertEquals("value1", cache.get(1));

            // Test explicit rollback.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.putAsync(1, "value2").get();

                tx.rollback();
            }

            assertEquals("value1", cache.get(1));

            // Test commit.
            try (ClientTransaction tx = client.transactions().txStart()) {
                cache.putAsync(1, "value2").get();

                tx.commit();
            }

            assertEquals("value2", cache.get(1));
        }
    }

    /**
     * Test transactions with label.
     */
    @Test
    public void testTransactionsWithLabel() throws Exception {
        try (IgniteEx ignite = (IgniteEx)Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, String> cache = client.createCache(new ClientCacheConfiguration()
                .setName("cache")
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );

            SystemView<TransactionView> txsView = ignite.context().systemView().view(TXS_MON_LIST);

            cache.put(0, "value1");

            try (ClientTransaction tx = client.transactions().withLabel("label").txStart()) {
                cache.put(0, "value2");

                assertEquals(1, F.size(txsView.iterator()));

                TransactionView txv = txsView.iterator().next();

                assertEquals("label", txv.label());

                assertEquals("value2", cache.get(0));
            }

            assertEquals("value1", cache.get(0));

            try (ClientTransaction tx = client.transactions().withLabel("label1").withLabel("label2").txStart()) {
                cache.put(0, "value2");

                assertEquals(1, F.size(txsView.iterator()));

                TransactionView txv = txsView.iterator().next();

                assertEquals("label2", txv.label());

                tx.commit();
            }

            assertEquals("value2", cache.get(0));

            // Test concurrent with label and without label transactions.
            try (ClientTransaction tx = client.transactions().withLabel("label").txStart(PESSIMISTIC, READ_COMMITTED)) {
                CyclicBarrier barrier = new CyclicBarrier(2);

                cache.put(0, "value3");

                IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> {
                    try (ClientTransaction tx1 = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) {
                        cache.put(1, "value3");

                        barrier.await();

                        assertEquals("value2", cache.get(0));

                        barrier.await();
                    }
                    catch (InterruptedException | BrokenBarrierException ignore) {
                        // No-op.
                    }
                });

                barrier.await();

                assertNull(cache.get(1));

                assertEquals(1, F.size(txsView.iterator(), txv -> txv.label() == null));
                assertEquals(1, F.size(txsView.iterator(), txv -> "label".equals(txv.label())));

                barrier.await();

                fut.get();
            }

            // Test nested transactions is not possible.
            try (ClientTransaction tx = client.transactions().withLabel("label1").txStart()) {
                try (ClientTransaction tx1 = client.transactions().txStart()) {
                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }

                try (ClientTransaction tx1 = client.transactions().withLabel("label2").txStart()) {
                    fail();
                }
                catch (ClientException expected) {
                    // No-op.
                }
            }
        }
    }

    /**
     * Test cache with expire policy.
     */
    @Test
    public void testExpirePolicy() throws Exception {
        long ttl = 600L;
        int MAX_RETRIES = 5;

        try (Ignite ignite = Ignition.start(Config.getServerConfiguration());
             IgniteClient client = Ignition.startClient(getClientConfiguration())
        ) {
            ClientCache<Integer, Object> cache = client.createCache(new ClientCacheConfiguration()
                .setName("cache")
                .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            );

            Duration dur = new Duration(TimeUnit.MILLISECONDS, ttl);

            ClientCache<Integer, Object> cachePlcCreated = cache.withExpirePolicy(new CreatedExpiryPolicy(dur));
            ClientCache<Integer, Object> cachePlcUpdated = cache.withExpirePolicy(new ModifiedExpiryPolicy(dur));
            ClientCache<Integer, Object> cachePlcAccessed = cache.withExpirePolicy(new AccessedExpiryPolicy(dur));

            for (int i = 0; i < MAX_RETRIES; i++) {
                cache.clear();

                long ts = U.currentTimeMillis();

                cache.put(0, 0);
                cachePlcCreated.put(1, 1);
                cachePlcUpdated.put(2, 2);
                cachePlcAccessed.put(3, 3);

                U.sleep(ttl / 3 * 2);

                boolean containsKey0 = cache.containsKey(0);
                boolean containsKey1 = cache.containsKey(1);
                boolean containsKey2 = cache.containsKey(2);
                boolean containsKey3 = cache.containsKey(3);

                if (U.currentTimeMillis() - ts >= ttl) // Retry if this block execution takes too long.
                    continue;

                assertTrue(containsKey0);
                assertTrue(containsKey1);
                assertTrue(containsKey2);
                assertTrue(containsKey3);

                ts = U.currentTimeMillis();

                cachePlcCreated.put(1, 2);
                cachePlcCreated.get(1); // Update and access key with created expire policy.
                cachePlcUpdated.put(2, 3); // Update key with modified expire policy.
                cachePlcAccessed.get(3); // Access key with accessed expire policy.

                U.sleep(ttl / 3 * 2);

                containsKey0 = cache.containsKey(0);
                containsKey1 = cache.containsKey(1);
                containsKey2 = cache.containsKey(2);
                containsKey3 = cache.containsKey(3);

                if (U.currentTimeMillis() - ts >= ttl) // Retry if this block execution takes too long.
                    continue;

                assertTrue(containsKey0);
                assertFalse(containsKey1);
                assertTrue(containsKey2);
                assertTrue(containsKey3);

                U.sleep(ttl / 3 * 2);

                cachePlcUpdated.get(2); // Access key with updated expire policy.

                U.sleep(ttl / 3 * 2);

                assertTrue(cache.containsKey(0));
                assertFalse(cache.containsKey(1));
                assertFalse(cache.containsKey(2));
                assertFalse(cache.containsKey(3));

                // Expire policy, keep binary and transactional flags together.
                ClientCache<Integer, Object> binCache = cachePlcCreated.withKeepBinary();

                try (ClientTransaction tx = client.transactions().txStart()) {
                    binCache.put(4, new T2<>("test", "test"));

                    tx.commit();
                }

                assertTrue(binCache.get(4) instanceof BinaryObject);
                assertFalse(cache.get(4) instanceof BinaryObject);

                U.sleep(ttl / 3 * 4);

                assertFalse(cache.containsKey(4));

                return;
            }

            fail("Failed to check expire policy within " + MAX_RETRIES + " retries (block execution takes too long)");
        }
    }

    /** */
    private static ClientConfiguration getClientConfiguration() {
        return new ClientConfiguration()
            .setAddresses(Config.SERVER)
            .setSendBufferSize(0)
            .setReceiveBufferSize(0);
    }
}
