/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.metastorage.client;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageCommandListener;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessage;
import org.apache.ignite.network.internal.recovery.message.HandshakeStartMessageSerializationFactory;
import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessage;
import org.apache.ignite.network.internal.recovery.message.HandshakeStartResponseMessageSerializationFactory;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessage;
import org.apache.ignite.network.scalecube.message.ScaleCubeMessageSerializationFactory;
import org.apache.ignite.network.serialization.MessageSerializationRegistry;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.server.RaftServer;
import org.apache.ignite.raft.server.impl.RaftServerImpl;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

/**
 * Meta storage client tests.
 */
@SuppressWarnings("WeakerAccess")
public class ITMetaStorageServiceTest {
    /** The logger. */
    private static final IgniteLogger LOG = IgniteLogger.forClass(ITMetaStorageServiceTest.class);

    /** Base network port. */
    private static final int NODE_PORT_BASE = 20_000;

    /** Nodes. */
    private static final int NODES = 2;

    /** */
    private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";

    /** */
    public static final int LATEST_REVISION = -1;

    /** Factory. */
    private static final RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();

    /** Network factory. */
    private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();

    /** */
    private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry()
        .registerFactory(ScaleCubeMessage.TYPE, new ScaleCubeMessageSerializationFactory())
        .registerFactory(HandshakeStartMessage.TYPE, new HandshakeStartMessageSerializationFactory())
        .registerFactory(HandshakeStartResponseMessage.TYPE, new HandshakeStartResponseMessageSerializationFactory());

    /**  Expected server result entry. */
    private static final org.apache.ignite.internal.metastorage.server.Entry EXPECTED_SRV_RESULT_ENTRY =
            new org.apache.ignite.internal.metastorage.server.Entry(
                    new byte[] {1},
                    new byte[] {2},
                    10,
                    2
            );

    /**  Expected server result entry. */
    private static final EntryImpl EXPECTED_RESULT_ENTRY =
            new EntryImpl(
                    new ByteArray(new byte[] {1}),
                    new byte[] {2},
                    10,
                    2
            );

    /** Expected result map. */
    private static final NavigableMap<ByteArray, Entry> EXPECTED_RESULT_MAP;

    private static final Collection<org.apache.ignite.internal.metastorage.server.Entry> EXPECTED_SRV_RESULT_COLL;

    /** Cluster. */
    private ArrayList<ClusterService> cluster = new ArrayList<>();

    /**  Meta storage raft server. */
    private RaftServer metaStorageRaftSrv;

    static {
        EXPECTED_RESULT_MAP = new TreeMap<>();

        EntryImpl entry1 = new EntryImpl(
                new ByteArray(new byte[]{1}),
                new byte[]{2},
                10,
                2
        );

        EXPECTED_RESULT_MAP.put(entry1.key(), entry1);

        EntryImpl entry2 = new EntryImpl(
                new ByteArray(new byte[]{3}),
                new byte[]{4},
                10,
                3
        );

        EXPECTED_RESULT_MAP.put(entry2.key(), entry2);

        EXPECTED_SRV_RESULT_COLL = new ArrayList<>();

        EXPECTED_SRV_RESULT_COLL.add(new org.apache.ignite.internal.metastorage.server.Entry(
                entry1.key().bytes(), entry1.value(), entry1.revision(), entry1.updateCounter()
        ));

        EXPECTED_SRV_RESULT_COLL.add(new org.apache.ignite.internal.metastorage.server.Entry(
                entry2.key().bytes(), entry2.value(), entry2.revision(), entry2.updateCounter()
        ));
    }

    /**
     * Run @{code} NODES cluster nodes.
     */
    @BeforeEach
    public void beforeTest() {
        for (int i = 0; i < NODES; i++) {
            cluster.add(
                    startClusterNode(
                            "node_" + i,
                            NODE_PORT_BASE + i,
                            IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().
                                    map((port) -> "localhost:" + port).collect(Collectors.toList())));
        }

        for (ClusterService node : cluster)
            assertTrue(waitForTopology(node, NODES, 1000));

        LOG.info("Cluster started.");
    }

    /**
     * Shutdown raft server and stop all cluster nodes.
     *
     * @throws Exception If failed to shutdown raft server,
     */
    @AfterEach
    public void afterTest() throws Exception {
        metaStorageRaftSrv.shutdown();

        for (ClusterService node : cluster)
            node.shutdown();
    }

    /**
     * Tests {@link MetaStorageService#get(ByteArray)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGet() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key) {
                        return EXPECTED_SRV_RESULT_ENTRY;
                    }
                });

        assertEquals(EXPECTED_RESULT_ENTRY, metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
    }

    /**
     * Tests {@link MetaStorageService#get(ByteArray, long)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetWithUpperBoundRevision() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key, long rev) {
                        return EXPECTED_SRV_RESULT_ENTRY;
                    }
                });

        assertEquals(
                EXPECTED_RESULT_ENTRY,
                metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key(), EXPECTED_RESULT_ENTRY.revision()).get()
        );
    }

    /**
     * Tests {@link MetaStorageService#getAll(Set)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAll() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull List<org.apache.ignite.internal.metastorage.server.Entry> getAll(List<byte[]> keys) {
                        return new ArrayList<>(EXPECTED_SRV_RESULT_COLL);
                    }
                });

        assertEquals(EXPECTED_RESULT_MAP, metaStorageSvc.getAll(EXPECTED_RESULT_MAP.keySet()).get());
    }

    /**
     * Tests {@link MetaStorageService#getAll(Set, long)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAllWithUpperBoundRevision() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull List<org.apache.ignite.internal.metastorage.server.Entry> getAll(List<byte[]> keys, long revUpperBound) {
                        return new ArrayList<>(EXPECTED_SRV_RESULT_COLL);
                    }
                });

        assertEquals(
                EXPECTED_RESULT_MAP,
                metaStorageSvc.getAll(EXPECTED_RESULT_MAP.keySet(), 10).get()
        );
    }

    /**
     * Tests {@link MetaStorageService#put(ByteArray, byte[])}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testPut() throws Exception {
        ByteArray expKey = new ByteArray(new byte[]{1});

        byte[] expVal = new byte[]{2};

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @SuppressWarnings("JavaAbbreviationUsage")
                    @Override public void put(byte[] key, byte[] value) {
                        assertArrayEquals(expKey.bytes(), key);

                        assertArrayEquals(expVal, value);
                    }
                });

        metaStorageSvc.put(expKey, expVal).get();
    }

    /**
     * Tests {@link MetaStorageService#getAndPut(ByteArray, byte[])}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndPut() throws Exception {
        byte[] expVal = new byte[]{2};

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @SuppressWarnings("JavaAbbreviationUsage")
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry getAndPut(byte[] key, byte[] value) {
                        assertArrayEquals(EXPECTED_RESULT_ENTRY.key().bytes(), key);

                        assertArrayEquals(expVal, value);

                        return EXPECTED_SRV_RESULT_ENTRY;
                    }
                });

        assertEquals(
                EXPECTED_RESULT_ENTRY,
                metaStorageSvc.getAndPut(EXPECTED_RESULT_ENTRY.key(), expVal).get()
        );
    }

    /**
     * Tests {@link MetaStorageService#putAll(Map)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testPutAll() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
                        // Assert keys equality.
                        assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());

                        List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
                                map(ByteArray::bytes).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expKeys.get(i), keys.get(i));


                        // Assert values equality.
                        assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());

                        List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
                                map(Entry::value).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expVals.get(i), values.get(i));
                    }
                });

        metaStorageSvc.putAll(
                EXPECTED_RESULT_MAP.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                e -> e.getValue().value())
                        )
        ).get();
    }

    /**
     * Tests {@link MetaStorageService#getAndPutAll(Map)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndPutAll() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull List<org.apache.ignite.internal.metastorage.server.Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
                        // Assert keys equality.
                        assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());

                        List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
                                map(ByteArray::bytes).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expKeys.get(i), keys.get(i));

                        // Assert values equality.
                        assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());

                        List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
                                map(Entry::value).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expVals.get(i), values.get(i));

                        return new ArrayList<>(EXPECTED_SRV_RESULT_COLL);
                    }
                });

        Map<ByteArray, Entry> gotRes = metaStorageSvc.getAndPutAll(
                EXPECTED_RESULT_MAP.entrySet().stream()
                        .collect(Collectors.toMap(
                                Map.Entry::getKey,
                                e -> e.getValue().value())
                        )
        ).get();

        assertEquals(EXPECTED_RESULT_MAP, gotRes);
    }

    /**
     * Tests {@link MetaStorageService#remove(ByteArray)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRemove() throws Exception {
        ByteArray expKey = new ByteArray(new byte[]{1});

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public void remove(byte[] key) {
                        assertArrayEquals(expKey.bytes(), key);
                    }
                });

        metaStorageSvc.remove(expKey).get();
    }

    /**
     * Tests {@link MetaStorageService#getAndRemove(ByteArray)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndRemove() throws Exception {
        EntryImpl expRes = new EntryImpl(
                new ByteArray(new byte[]{1}),
                new byte[]{3},
                10,
                2
        );

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry getAndRemove(byte[] key) {
                        assertArrayEquals(expRes.key().bytes(), key);

                        return new org.apache.ignite.internal.metastorage.server.Entry(
                                expRes.key().bytes(),
                                expRes.value(),
                                expRes.revision(),
                                expRes.updateCounter()
                        );
                    }
                });

        assertEquals(expRes, metaStorageSvc.getAndRemove(expRes.key()).get());
    }

    /**
     * Tests {@link MetaStorageService#removeAll(Set)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRemoveAll() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public void removeAll(List<byte[]> keys) {
                        assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());

                        List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
                                map(ByteArray::bytes).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expKeys.get(i), keys.get(i));
                    }
                });

        metaStorageSvc.removeAll(EXPECTED_RESULT_MAP.keySet()).get();
    }

    /**
     * Tests {@link MetaStorageService#getAndRemoveAll(Set)}.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testGetAndRemoveAll() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull List<org.apache.ignite.internal.metastorage.server.Entry> getAndRemoveAll(List<byte[]> keys) {
                        // Assert keys equality.
                        assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());

                        List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
                                map(ByteArray::bytes).collect(Collectors.toList());

                        for (int i = 0; i < expKeys.size(); i++)
                            assertArrayEquals(expKeys.get(i), keys.get(i));

                        return new ArrayList<>(EXPECTED_SRV_RESULT_COLL);
                    }
                });

        Map<ByteArray, Entry> gotRes = metaStorageSvc.getAndRemoveAll(EXPECTED_RESULT_MAP.keySet()).get();

        assertEquals(EXPECTED_RESULT_MAP, gotRes);
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} with not null keyTo and explicit revUpperBound.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeWitKeyToAndUpperBound() throws Exception {
        ByteArray expKeyFrom = new ByteArray(new byte[]{1});

        ByteArray expKeyTo = new ByteArray(new byte[]{3});

        long expRevUpperBound = 10;

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        assertArrayEquals(expKeyFrom.bytes(), keyFrom);

                        assertArrayEquals(expKeyTo.bytes(), keyTo);

                        assertEquals(expRevUpperBound, revUpperBound);

                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
                                @Override public boolean hasNext() {
                                    return false;
                                }

                                @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
                                    return null;
                                }
                            };


                            @Override public void close() throws Exception {

                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
                                return it;
                            }

                            @Override
                            public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override
                            public org.apache.ignite.internal.metastorage.server.Entry next() {
                                return it.next();
                            }
                        };
                    }
                });

        metaStorageSvc.range(expKeyFrom, expKeyTo, expRevUpperBound).close();
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} with not null keyTo.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeWitKeyTo() throws Exception {
        ByteArray expKeyFrom = new ByteArray(new byte[]{1});

        ByteArray expKeyTo = new ByteArray(new byte[]{3});

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        assertArrayEquals(expKeyFrom.bytes(), keyFrom);

                        assertArrayEquals(expKeyTo.bytes(), keyTo);

                        assertEquals(LATEST_REVISION, revUpperBound);

                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
                                @Override
                                public boolean hasNext() {
                                    return false;
                                }

                                @Override
                                public org.apache.ignite.internal.metastorage.server.Entry next() {
                                    return null;
                                }
                            };

                            @Override public void close() throws Exception {

                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
                                return it;
                            }

                            @Override public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
                                return it.next();
                            }
                        };
                    }
                });

        metaStorageSvc.range(expKeyFrom, expKeyTo).close();
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} with null keyTo.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeWitNullAsKeyTo() throws Exception {
        ByteArray expKeyFrom = new ByteArray(new byte[]{1});

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        assertArrayEquals(expKeyFrom.bytes(), keyFrom);

                        assertNull(keyTo);

                        assertEquals(LATEST_REVISION, revUpperBound);

                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
                                @Override public boolean hasNext() {
                                    return false;
                                }

                                @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
                                    return null;
                                }
                            };

                            @Override public void close() throws Exception {

                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
                                return it;
                            }

                            @Override public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override
                            public org.apache.ignite.internal.metastorage.server.Entry next() {
                                return it.next();
                            }
                        };
                    }
                });

        metaStorageSvc.range(expKeyFrom, null).close();
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} hasNext.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeHasNext() throws Exception {
        ByteArray expKeyFrom = new ByteArray(new byte[]{1});

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
                                @Override public boolean hasNext() {
                                    return true;
                                }

                                @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
                                    return null;
                                }
                            };

                            @Override public void close() throws Exception {

                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
                                return it;
                            }

                            @Override public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override
                            public org.apache.ignite.internal.metastorage.server.Entry next() {
                                return it.next();
                            }
                        };
                    }
                });

        Cursor<Entry> cursor = metaStorageSvc.range(expKeyFrom, null);

        assertTrue(cursor.iterator().hasNext());
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} next.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeNext() throws Exception {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
                                @Override public boolean hasNext() {
                                    return true;
                                }

                                @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
                                    return EXPECTED_SRV_RESULT_ENTRY;
                                }
                            };

                            @Override public void close() throws Exception {

                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
                                return it;
                            }

                            @Override public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override
                            public org.apache.ignite.internal.metastorage.server.Entry next() {
                                return it.next();
                            }
                        };
                    }
                });

        Cursor<Entry> cursor = metaStorageSvc.range(EXPECTED_RESULT_ENTRY.key(), null);

        assertEquals(EXPECTED_RESULT_ENTRY, (cursor.iterator().next()));
    }

    /**
     * Tests {@link MetaStorageService#range(ByteArray, ByteArray, long)}} close.
     *
     * @throws Exception If failed.
     */
    @Test
    public void testRangeClose() throws Exception {
        ByteArray expKeyFrom = new ByteArray(new byte[]{1});

        Cursor cursorMock = mock(Cursor.class);

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
                        return cursorMock;
                    }
                });

        Cursor<Entry> cursor = metaStorageSvc.range(expKeyFrom, null);

        cursor.close();

        verify(cursorMock, times(1)).close();
    }

    @Test
    public void testWatchOnUpdate() throws Exception {
        org.apache.ignite.internal.metastorage.server.WatchEvent returnedWatchEvents = new org.apache.ignite.internal.metastorage.server.WatchEvent(List.of(
                new org.apache.ignite.internal.metastorage.server.EntryEvent(
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[]{2},
                                new byte[]{20},
                                1,
                                1
                        ),
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[]{2},
                                new byte[]{21},
                                2,
                                4
                        )
                ),
                new org.apache.ignite.internal.metastorage.server.EntryEvent(
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[] {3},
                                new byte[] {20},
                                1,
                                2
                        ),
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[] {3},
                                new byte[]{},
                                2,
                                5
                        )
                ),
                new org.apache.ignite.internal.metastorage.server.EntryEvent(
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[] {4},
                                new byte[] {20},
                                1,
                                3
                        ),
                        new org.apache.ignite.internal.metastorage.server.Entry(
                                new byte[] {4},
                                new byte[] {},
                                3,
                                6
                        )
                )
        ));

        ByteArray keyFrom = new ByteArray(new byte[]{1});

        ByteArray keyTo = new ByteArray(new byte[]{10});

        long rev = 2;

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
                        return new Cursor<>() {
                            private final Iterator<org.apache.ignite.internal.metastorage.server.WatchEvent> it = new Iterator<>() {
                                @Override public boolean hasNext() {

                                    return retirevedItemCnt.get() < returnedWatchEvents.entryEvents().size();
                                }

                                @Override public org.apache.ignite.internal.metastorage.server.WatchEvent next() {
                                    return returnedWatchEvents;
                                }
                            };

                            AtomicInteger retirevedItemCnt = new AtomicInteger(0);

                            @Override public void close() throws Exception {
                                // No-op.
                            }

                            @NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.WatchEvent> iterator() {
                                return it;
                            }

                            @Override public boolean hasNext() {
                                return it.hasNext();
                            }

                            @Override
                            public org.apache.ignite.internal.metastorage.server.WatchEvent next() {
                                return it.next();
                            }
                        };
                    }
                });

        CountDownLatch latch = new CountDownLatch(1);

        IgniteUuid watchId = metaStorageSvc.watch(keyFrom, keyTo, rev, new WatchListener() {
            @Override public boolean onUpdate(@NotNull WatchEvent events) {
                List gotEvents = new ArrayList();

                List returnedWatchEvents = new ArrayList(events.entryEvents());

                Iterator<EntryEvent> iter = events.entryEvents().iterator();

                while (iter.hasNext())
                    gotEvents.add(iter.next());

                assertEquals(3, gotEvents.size());

                assertTrue(gotEvents.contains(returnedWatchEvents.get(0)));

                assertTrue(gotEvents.contains(returnedWatchEvents.get(1)));

                latch.countDown();

                return true;
            }

            @Override public void onError(@NotNull Throwable e) {
                // Within given test it's not expected to get here.
                fail();
            }
        }).get();

        latch.await();

        metaStorageSvc.stopWatch(watchId).get();
    }

    @Test
    public void testInvoke() throws Exception {
        ByteArray expKey = new ByteArray(new byte[]{1});

        byte[] expVal = new byte[]{2};

        Condition condition = Conditions.notExists(expKey);

        Operation success = Operations.put(expKey, expVal);

        Operation failure = Operations.noop();

        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public boolean invoke(
                            org.apache.ignite.internal.metastorage.server.Condition cond,
                            Collection<org.apache.ignite.internal.metastorage.server.Operation> success,
                            Collection<org.apache.ignite.internal.metastorage.server.Operation> failure) {
                        assertArrayEquals(expKey.bytes(), cond.key());

                        assertArrayEquals(expKey.bytes(), success.iterator().next().key());
                        assertArrayEquals(expVal, success.iterator().next().value());

                        assertEquals(OperationType.NO_OP, failure.iterator().next().type());

                        return true;
                    }
                });

        assertTrue(metaStorageSvc.invoke(condition, success, failure).get());
    }

    // TODO: IGNITE-14693 Add tests for exception handling logic: onError,
    // TODO: (CompactedException | OperationTimeoutException)

    /**
     * Tests {@link MetaStorageService#get(ByteArray)}.
     *
     * @throws Exception If failed.
     */
    @Disabled // TODO: IGNITE-14693 Add tests for exception handling logic.
    @Test
    public void testGetThatThrowsCompactedException() {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key) {
                        throw new org.apache.ignite.internal.metastorage.server.CompactedException();
                    }
                });

        assertThrows(CompactedException.class, () -> metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
    }

    /**
     * Tests {@link MetaStorageService#get(ByteArray)}.
     *
     * @throws Exception If failed.
     */
    @Disabled // TODO: IGNITE-14693 Add tests for exception handling logic.
    @Test
    public void testGetThatThrowsOperationTimeoutException() {
        MetaStorageService metaStorageSvc = prepareMetaStorage(
                new AbstractKeyValueStorage() {
                    @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key) {
                        throw new OperationTimeoutException();
                    }
                });

        assertThrows(OperationTimeoutException.class, () -> metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
    }

    /**
     * @param name Node name.
     * @param port Local port.
     * @param srvs Server nodes of the cluster.
     * @return The client cluster view.
     */
    private ClusterService startClusterNode(String name, int port, List<String> srvs) {
        var ctx = new ClusterLocalConfiguration(name, port, srvs, SERIALIZATION_REGISTRY);

        var net = NETWORK_FACTORY.createClusterService(ctx);

        net.start();

        return net;
    }

    /**
     * @param cluster The cluster.
     * @param exp Expected count.
     * @param timeout The timeout in millis.
     * @return {@code True} if topology size is equal to expected.
     */
    @SuppressWarnings("SameParameterValue")
    private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
        long stop = System.currentTimeMillis() + timeout;

        while (System.currentTimeMillis() < stop) {
            if (cluster.topologyService().allMembers().size() >= exp)
                return true;

            try {
                Thread.sleep(50);
            }
            catch (InterruptedException e) {
                return false;
            }
        }

        return false;
    }

    /**
     * Prepares meta storage by instantiating corresponding raft server with MetaStorageCommandListener and
     * {@link MetaStorageServiceImpl}.
     *
     * @param keyValStorageMock {@link KeyValueStorage} mock.
     * @return {@link MetaStorageService} instance.
     */
    private MetaStorageService prepareMetaStorage(KeyValueStorage keyValStorageMock) {
        metaStorageRaftSrv = new RaftServerImpl(
                cluster.get(0),
                FACTORY,
                1000,
                Map.of(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageCommandListener(keyValStorageMock))
        );

        RaftGroupService metaStorageRaftGrpSvc = new RaftGroupServiceImpl(
                METASTORAGE_RAFT_GROUP_NAME,
                cluster.get(1),
                FACTORY,
                10_000,
                List.of(new Peer(cluster.get(0).topologyService().localMember())),
                true,
                200
        );

        return new MetaStorageServiceImpl(metaStorageRaftGrpSvc);
    }

    /**
     * Abstract {@link KeyValueStorage}. Used for tests purposes.
     */
    @SuppressWarnings("JavaAbbreviationUsage")
    private abstract class AbstractKeyValueStorage implements KeyValueStorage {
        /** {@inheritDoc} */
        @Override public long revision() {
            return 0;
        }

        /** {@inheritDoc} */
        @Override public long updateCounter() {
            return 0;
        }

        /** {@inheritDoc} */
        @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry get(byte[] key, long rev) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public @NotNull Collection<org.apache.ignite.internal.metastorage.server.Entry> getAll(List<byte[]> keys) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public @NotNull Collection<org.apache.ignite.internal.metastorage.server.Entry> getAll(List<byte[]> keys, long revUpperBound) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public void put(byte[] key, byte[] value) {
            fail();
        }

        /** {@inheritDoc} */
        @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry getAndPut(byte[] key, byte[] value) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
            fail();
        }

        /** {@inheritDoc} */
        @Override public @NotNull Collection<org.apache.ignite.internal.metastorage.server.Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public void remove(byte[] key) {
            fail();
        }

        /** {@inheritDoc} */
        @Override public @NotNull org.apache.ignite.internal.metastorage.server.Entry getAndRemove(byte[] key) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public void removeAll(List<byte[]> keys) {
            fail();
        }

        /** {@inheritDoc} */
        @Override public @NotNull Collection<org.apache.ignite.internal.metastorage.server.Entry> getAndRemoveAll(List<byte[]> keys) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public boolean invoke(
                org.apache.ignite.internal.metastorage.server.Condition condition,
                Collection<org.apache.ignite.internal.metastorage.server.Operation> success,
                Collection<org.apache.ignite.internal.metastorage.server.Operation> failure
        ) {
            fail();

            return false;
        }

        /** {@inheritDoc} */
        @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(byte[] key, long rev) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public Cursor<org.apache.ignite.internal.metastorage.server.WatchEvent> watch(Collection<byte[]> keys, long rev) {
            fail();

            return null;
        }

        /** {@inheritDoc} */
        @Override public void compact() {
            fail();
        }
    }
}
