/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.ignite.internal.processors.cache.persistence.wal;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.ONHEAP;

/**
 *
 */
public class SegmentedRingByteBufferTest extends GridCommonAbstractTest {
    /**
     * @throws Exception If failed.
     */
    @Test
    public void testAligned() throws Exception {
        doTestAligned(ONHEAP);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testAlignedDirect() throws Exception {
        doTestAligned(DIRECT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNotAligned() throws Exception {
        doTestNotAligned(ONHEAP);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNotAlignedDirect() throws Exception {
        doTestNotAligned(DIRECT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoOverflowMultiThreaded() throws Exception {
        doTestNoOverflowMultiThreaded(ONHEAP);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testNoOverflowMultiThreadedDirect() throws Exception {
        doTestNoOverflowMultiThreaded(DIRECT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMultiThreaded() throws Exception {
        doTestMultiThreaded(ONHEAP);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMultiThreadedDirect() throws Exception {
        doTestMultiThreaded(DIRECT);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMultiThreaded2() throws Exception {
        doTestMultiThreaded2(ONHEAP);
    }

    /**
     * @throws Exception If failed.
     */
    @Test
    public void testMultiThreadedDirect2() throws Exception {
        doTestMultiThreaded2(DIRECT);
    }

    /**
     * @param mode Mode.
     */
    private void doTestAligned(SegmentedRingByteBuffer.BufferMode mode) {
        int cap = 128;

        int size = 8;

        SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);

        assertNull(buf.poll());

        // Head and tail in initial state.
        for (int j = 0; j < 2; j++) {
            for (int i = 0; i < cap / size; i++) {
                SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);

                ByteBuffer bbuf = seg.buffer();

                assertEquals(size * i, bbuf.position());
                assertEquals(size * (i + 1), bbuf.limit());

                bbuf.putLong(i + (j * 10));

                seg.release();
            }

            assertNull(buf.offer(size));

            List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();

            ByteBuffer bbuf = segs.get(0).buffer();

            assertEquals(cap, bbuf.remaining());

            for (int i = 0; i < cap / size; i++)
                assertEquals(i + (j * 10), bbuf.getLong());

            segs.get(0).release();

            assertEquals(0, bbuf.remaining());
            assertNull(buf.poll());
        }

        // Move tail.
        for (int i = 0; i < 2; i++) {
            SegmentedRingByteBuffer.WriteSegment seg = buf.offer(size);

            ByteBuffer bbuf = seg.buffer();

            assertEquals(size * i, bbuf.position());
            assertEquals(size * (i + 1), bbuf.limit());

            bbuf.putLong(i);

            seg.release();
        }

        // Move head to tail.
        List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();

        ByteBuffer bbuf = segs.get(0).buffer();

        assertEquals(size * 2, bbuf.remaining());

        for (int i = 0; i < 2; i++)
            assertEquals(i, bbuf.getLong());

        segs.get(0).release();

        assertEquals(0, bbuf.remaining());
        assertNull(buf.poll());
    }

    /**
     * @param mode Mode.
     */
    private void doTestNotAligned(SegmentedRingByteBuffer.BufferMode mode) {
        int size = 8;

        int cap = 32 - size / 2; // 3.5 long values.

        SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);

        assertNull(buf.poll());

        // Write 2 segments.
        SegmentedRingByteBuffer.WriteSegment wseg;
        List<SegmentedRingByteBuffer.ReadSegment> rsegs;
        ByteBuffer bbuf;

        wseg = buf.offer(size);
        bbuf = wseg.buffer();

        bbuf.putLong(1);

        wseg.release();

        wseg = buf.offer(size);

        bbuf = wseg.buffer();

        bbuf.putLong(2);

        wseg.release();

        // Read 2 segments.
        rsegs = buf.poll();
        bbuf = rsegs.get(0).buffer();

        assertEquals(1, bbuf.getLong());
        assertEquals(2, bbuf.getLong());

        rsegs.get(0).release();

        assertNull(buf.poll());

        // Write 2 segments.
        wseg = buf.offer(size);
        bbuf = wseg.buffer();

        bbuf.putLong(3);

        wseg.release();

        // This one will overflow buffer.
        wseg = buf.offer(size);
        bbuf = wseg.buffer();

        bbuf.putLong(4);

        wseg.release();

        // Ring buffer should return two separate segments instead of one due to an overflow.
        rsegs = buf.poll();
        bbuf = rsegs.get(0).buffer();

        // First segment.
        assertEquals(3, bbuf.getLong());
        assertEquals(4, bbuf.remaining());

        int pos = bbuf.position();

        byte[] tmp = new byte[8];

        byte[] arr = new byte[bbuf.capacity()];

        bbuf.position(0);

        bbuf.limit(bbuf.capacity());

        bbuf.get(arr);

        System.arraycopy(arr, pos, tmp, 0, 4);

        // One more segment available.
        bbuf = rsegs.get(1).buffer();

        assertEquals(4, bbuf.remaining());

        bbuf.position(0);

        bbuf.limit(bbuf.capacity());

        bbuf.get(arr);

        System.arraycopy(arr, 0, tmp, 4, 4);

        ByteBuffer bb = ByteBuffer.wrap(tmp);

        bb.order(ByteOrder.nativeOrder());

        assertEquals(4, bb.getLong());

        rsegs.get(1).release();

        assertNull(buf.poll());
    }

    /**
     * @param mode Mode.
     */
    private void doTestNoOverflowMultiThreaded(
        SegmentedRingByteBuffer.BufferMode mode
    ) throws org.apache.ignite.IgniteCheckedException, BrokenBarrierException, InterruptedException {
        int producerCnt = 16;

        final int cap = 256 * 1024;

        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);

        final AtomicBoolean stop = new AtomicBoolean(false);

        final AtomicReference<Throwable> ex = new AtomicReference<>();

        final CyclicBarrier startBarrier = new CyclicBarrier(producerCnt);

        final CyclicBarrier restartBarrier = new CyclicBarrier(producerCnt + 1);

        final AtomicLong totalWritten = new AtomicLong();

        IgniteInternalFuture<Long> fut;

        try {
            fut = GridTestUtils.runMultiThreadedAsync(() -> {
                try {
                    try {
                        startBarrier.await();
                    }
                    catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();

                        fail();
                    }

                    while (!stop.get()) {

                        TestObject obj = new TestObject();

                        SegmentedRingByteBuffer.WriteSegment seg = buf.offer(obj.size());

                        ByteBuffer bbuf;

                        if (seg == null) {
                            try {
                                restartBarrier.await(getTestTimeout(), TimeUnit.MILLISECONDS);
                            } catch (InterruptedException | TimeoutException | BrokenBarrierException e) {
                                break;
                            }

                            continue;
                        }

                        bbuf = seg.buffer();

                        assertEquals(obj.size(), bbuf.remaining());

                        bbuf.putLong(obj.id);
                        bbuf.putInt(obj.len);
                        bbuf.put(obj.arr);

                        assertEquals(0, bbuf.remaining());

                        seg.release();

                        long total = totalWritten.addAndGet(obj.size());

                        assertTrue(total <= cap);
                    }
                }
                catch (Throwable th) {
                    ex.compareAndSet(null, th);
                }
            }, producerCnt, "producer-thread");

            long endTime = System.currentTimeMillis() + 60 * 1000L;

            while (System.currentTimeMillis() < endTime && ex.get() == null) {
                while (restartBarrier.getNumberWaiting() != producerCnt && ex.get() == null)
                    U.sleep(10);

                if (ex.get() != null)
                    fail("Exception in producer thread, ex=" + ex.get());

                List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll();

                if (segs != null) {
                    for (SegmentedRingByteBuffer.ReadSegment seg : segs)
                        seg.release();
                }

                totalWritten.set(0);

                restartBarrier.await();
            }
        }
        finally {
            stop.set(true);

            restartBarrier.reset();
        }

        fut.get();

        if (ex.get() != null)
            fail("Exception in producer thread, ex=" + ex.get());
    }

    /**
     * @param mode Mode.
     */
    private void doTestMultiThreaded(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
        int producerCnt = 16;

        final int cap = 256 * 1024;

        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);

        final AtomicBoolean stop = new AtomicBoolean(false);

        final AtomicReference<Throwable> ex = new AtomicReference<>();

        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);

        IgniteInternalFuture<Long> fut;

        try {
            fut = GridTestUtils.runMultiThreadedAsync(() -> {
                try {
                    try {
                        barrier.await();
                    }
                    catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();

                        fail();
                    }

                    while (!stop.get()) {
                        TestObject obj = new TestObject();

                        SegmentedRingByteBuffer.WriteSegment seg;
                        ByteBuffer bbuf;

                        for (;;) {
                            if (stop.get())
                                return;

                            seg = buf.offer(obj.size());

                            if (seg != null)
                                break;
                        }

                        try {
                            bbuf = seg.buffer();

                            assertEquals(obj.size(), bbuf.remaining());

                            bbuf.putLong(obj.id);
                            bbuf.putInt(obj.len);
                            bbuf.put(obj.arr);

                            assertEquals(0, bbuf.remaining());

                        }
                        finally {
                            seg.release();
                        }
                    }
                }
                catch (Throwable th) {
                    ex.compareAndSet(null, th);
                }
            }, producerCnt, "producer-thread");

            Random rnd = new Random();

            long endTime = System.currentTimeMillis() + 60 * 1000L;

            while (System.currentTimeMillis() < endTime && ex.get() == null) {
                try {
                    U.sleep(rnd.nextInt(100) + 1);
                }
                catch (IgniteInterruptedCheckedException e) {
                    e.printStackTrace();
                }

                List<SegmentedRingByteBuffer.ReadSegment> segs;

                if ((segs = buf.poll()) != null) {
                    for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
                        assertTrue(seg.buffer().hasRemaining());

                        seg.release();
                    }
                }
            }
        }
        finally {
            stop.set(true);
        }

        fut.get();

        if (ex.get() != null)
            fail("Exception in producer thread, ex=" + ex.get());
    }

    /**
     * @param mode Mode.
     */
    private void doTestMultiThreaded2(SegmentedRingByteBuffer.BufferMode mode) throws org.apache.ignite.IgniteCheckedException {
        int producerCnt = 16;

        final int cap = 256 * 1024;

        final SegmentedRingByteBuffer buf = new SegmentedRingByteBuffer(cap, Long.MAX_VALUE, mode);

        final AtomicReference<Throwable> ex = new AtomicReference<>();

        final AtomicBoolean stop = new AtomicBoolean(false);

        final CyclicBarrier barrier = new CyclicBarrier(producerCnt);

        final Set<TestObject> items = Collections.newSetFromMap(new ConcurrentHashMap<TestObject, Boolean>());

        IgniteInternalFuture<Long> fut;

        try {
            fut = GridTestUtils.runMultiThreadedAsync(() -> {
                try {
                    try {
                        barrier.await();
                    }
                    catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();

                        fail();
                    }

                    while (!stop.get()) {
                        TestObject obj = new TestObject();

                        SegmentedRingByteBuffer.WriteSegment seg;
                        ByteBuffer bbuf;

                        for (;;) {
                            if (stop.get())
                                return;

                            seg = buf.offer(obj.size());

                            if (seg != null)
                                break;
                        }

                        try {
                            bbuf = seg.buffer();

                            assertEquals(obj.size(), bbuf.remaining());

                            bbuf.putLong(obj.id);
                            bbuf.putInt(obj.len);
                            bbuf.put(obj.arr);

                            assertEquals(0, bbuf.remaining());

                            assertTrue("Ooops! The same value is already exist in Set! ", items.add(obj));
                        }
                        finally {
                            seg.release();
                        }
                    }
                }
                catch (Throwable th) {
                    ex.compareAndSet(null, th);
                }
            }, producerCnt, "producer-thread");

            Random rnd = new Random();

            long endTime = System.currentTimeMillis() + 60 * 1000L;

            while (System.currentTimeMillis() < endTime && ex.get() == null) {
                try {
                    U.sleep(rnd.nextInt(100) + 1);
                }
                catch (IgniteInterruptedCheckedException e) {
                    e.printStackTrace();
                }

                List<SegmentedRingByteBuffer.ReadSegment> segs;

                while ((segs = buf.poll()) != null) {
                    int size = 0;

                    for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
                        ByteBuffer bbuf = seg.buffer();

                        assertTrue(bbuf.hasRemaining());

                        size += bbuf.remaining();
                    }

                    byte[] arr = new byte[size];

                    int idx = 0;

                    for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
                        ByteBuffer bbuf = seg.buffer();

                        assertTrue(bbuf.hasRemaining());

                        int len = bbuf.remaining();

                        bbuf.get(arr, idx, len);

                        idx += len;
                    }

                    ByteBuffer bbuf = ByteBuffer.wrap(arr);

                    bbuf.order(ByteOrder.nativeOrder());

                    assertTrue(bbuf.hasRemaining());

                    while (bbuf.hasRemaining()) {
                        long id = bbuf.getLong();

                        int len = bbuf.getInt();

                        arr = new byte[len];

                        bbuf.get(arr);

                        TestObject obj = new TestObject(id, arr);

                        assertTrue(items.remove(obj));
                    }

                    for (SegmentedRingByteBuffer.ReadSegment seg : segs)
                        seg.release();
                }
            }
        }
        finally {
            stop.set(true);
        }

        fut.get();

        if (ex.get() != null)
            fail("Exception in producer thread, ex=" + ex.get());

        List<SegmentedRingByteBuffer.ReadSegment> segs;

        while ((segs = buf.poll()) != null) {
            int size = 0;

            for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
                ByteBuffer bbuf = seg.buffer();

                assertTrue(bbuf.hasRemaining());

                size += bbuf.remaining();
            }

            byte[] arr = new byte[size];

            int idx = 0;

            for (SegmentedRingByteBuffer.ReadSegment seg : segs) {
                ByteBuffer bbuf = seg.buffer();

                assertTrue(bbuf.hasRemaining());

                int len = bbuf.remaining();

                bbuf.get(arr, idx, len);

                idx += len;
            }

            ByteBuffer bbuf = ByteBuffer.wrap(arr);

            bbuf.order(ByteOrder.nativeOrder());

            assertTrue(bbuf.hasRemaining());

            while (bbuf.hasRemaining()) {
                long id = bbuf.getLong();

                int len = bbuf.getInt();

                arr = new byte[len];

                bbuf.get(arr);

                TestObject obj = new TestObject(id, arr);

                assertTrue(items.remove(obj));
            }

            for (SegmentedRingByteBuffer.ReadSegment seg : segs)
                seg.release();
        }

        assertNull(buf.poll());
        assertEquals(0, items.size());
    }

    /**
     *
     */
    private static class TestObject {
        /** Id. */
        private long id;

        /** Length. */
        private int len;

        /** Array. */
        private byte[] arr;

        /**
         * Default constructor.
         */
        public TestObject() {
            ThreadLocalRandom rnd = ThreadLocalRandom.current();

            id = rnd.nextLong();
            len = rnd.nextInt(32 * 1024);
            arr = new byte[len];

            rnd.nextBytes(arr);
        }

        /**
         * @param id Id.
         * @param arr Array.
         */
        public TestObject(long id, byte[] arr) {
            this.id = id;
            this.arr = arr;

            len = arr.length;
        }

        /**
         *
         */
        public int size() {
            return 8 + 4 + arr.length;
        }

        /** {@inheritDoc} */
        @Override public boolean equals(Object that) {
            if (this == that)
                return true;

            if (that == null || getClass() != that.getClass())
                return false;

            TestObject obj = (TestObject)that;

            if (id != obj.id)
                return false;

            if (len != obj.len)
                return false;

            return Arrays.equals(arr, obj.arr);

        }

        /** {@inheritDoc} */
        @Override public int hashCode() {
            int res = (int)(id ^ (id >>> 32));

            res = 31 * res + len;

            res = 31 * res + Arrays.hashCode(arr);

            return res;
        }
    }
}
