/*
 * Copyright 2017 HugeGraph Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.baidu.hugegraph.computer.core.sort.sorter;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;

import com.baidu.hugegraph.computer.core.combiner.IntValueSumCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.io.BytesInput;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterTestUtil;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.PeekableIterator;
import com.baidu.hugegraph.computer.core.store.KvEntryFileWriter;
import com.baidu.hugegraph.computer.core.store.StoreTestUtil;
import com.baidu.hugegraph.computer.core.store.entry.DefaultKvEntry;
import com.baidu.hugegraph.computer.core.store.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.entry.InlinePointer;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.core.store.entry.Pointer;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDir;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDirImpl;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.builder.HgkvDirBuilderImpl;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.testutil.Assert;
import com.baidu.hugegraph.util.Bytes;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

public class SortLargeDataTest {

    private static final Logger LOG = Log.logger(SortLargeDataTest.class);
    private static Config CONFIG;

    @BeforeClass
    public static void init() {
        CONFIG = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MERGE_FILES_NUM, "200",
                ComputerOptions.HGKV_MAX_FILE_SIZE, String.valueOf(Bytes.GB),
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
    }

    @Before
    public void setup() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @After
    public void teardown() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @Test
    public void testAllProcess() throws Exception {
        StopWatch watcher = new StopWatch();
        final long bufferSize = Bytes.MB;
        final int mergeBufferNum = 300;
        final int dataSize = 1000000;
        long value = 0;

        Random random = new Random();
        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        List<RandomAccessInput> buffers = new ArrayList<>(mergeBufferNum);
        List<String> mergeBufferFiles = new ArrayList<>();
        int fileNum = 10;
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);

        watcher.start();
        for (int i = 0; i < dataSize; i++) {
            SorterTestUtil.writeData(output, random.nextInt(dataSize));
            int entryValue = random.nextInt(5);
            SorterTestUtil.writeData(output, entryValue);
            value = value + entryValue;

            // Write data to buffer and sort buffer
            if (output.position() >= bufferSize || (i + 1) == dataSize) {
                BytesInput input = EntriesUtil.inputFromOutput(output);
                buffers.add(sortBuffer(sorter, input));
                output.seek(0);
            }

            // Merge buffers to HgkvDir
            if (buffers.size() >= mergeBufferNum || (i + 1) == dataSize) {
                String outputFile = StoreTestUtil.availablePathById(fileNum++);
                mergeBufferFiles.add(outputFile);
                mergeBuffers(sorter, buffers, outputFile);
                buffers.clear();
            }
        }

        // Merge file
        String resultFile = StoreTestUtil.availablePathById("0");
        mergeFiles(sorter, mergeBufferFiles, Lists.newArrayList(resultFile));

        watcher.stop();
        LOG.info("testAllProcess sort time: {}", watcher.getTime());

        long result = sumOfEntryValue(sorter, ImmutableList.of(resultFile));
        Assert.assertEquals(value, result);
    }

    @Test
    public void testMergeBuffers() throws Exception {
        StopWatch watcher = new StopWatch();
        // Sort buffers total size 100M, each buffer is 50KB
        final long bufferSize = Bytes.KB * 50;
        final long bufferNum = 2000;
        final int keyRange = 10000000;
        long totalValue = 0L;

        Random random = new Random();
        List<RandomAccessInput> buffers = new ArrayList<>();
        for (int i = 0; i < bufferNum; i++) {
            BytesOutput buffer = IOFactory.createBytesOutput(
                                 Constants.SMALL_BUF_SIZE);
            while (buffer.position() < bufferSize) {
                // Write data
                int key = random.nextInt(keyRange);
                SorterTestUtil.writeData(buffer, key);
                int value = random.nextInt(100);
                SorterTestUtil.writeData(buffer, value);
                totalValue += value;
            }
            buffers.add(EntriesUtil.inputFromOutput(buffer));
        }

        // Sort buffer
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);
        watcher.start();
        List<RandomAccessInput> sortedBuffers = new ArrayList<>();
        for (RandomAccessInput buffer : buffers) {
            RandomAccessInput sortedBuffer = sortBuffer(sorter, buffer);
            sortedBuffers.add(sortedBuffer);
        }
        watcher.stop();

        LOG.info("testMergeBuffers sort buffer cost time: {}",
                 watcher.getTime());

        String resultFile = StoreTestUtil.availablePathById("0");
        // Sort buffers
        watcher.reset();
        watcher.start();
        sorter.mergeBuffers(sortedBuffers, new KvOuterSortFlusher(),
                            resultFile, false);
        watcher.stop();

        LOG.info("testMergeBuffers merge buffers cost time: {}",
                 watcher.getTime());

        // Assert result
        long result = sumOfEntryValue(sorter, ImmutableList.of(resultFile));
        Assert.assertEquals(totalValue, result);
        assertFileOrder(sorter, ImmutableList.of(resultFile));
    }

    @Test
    public void testMergeBuffersAllSameKey() throws Exception {
        List<RandomAccessInput> buffers = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            BytesOutput buffer = IOFactory.createBytesOutput(
                                 Constants.SMALL_BUF_SIZE);
            for (int j = 0; j < 100; j++) {
                // Write data
                SorterTestUtil.writeData(buffer, 1);
                SorterTestUtil.writeData(buffer, 1);
            }
            buffers.add(EntriesUtil.inputFromOutput(buffer));
        }

        String resultFile = StoreTestUtil.availablePathById("0");
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);
        mergeBuffers(sorter, buffers, resultFile);

        // Assert result
        long result = sumOfEntryValue(sorter, ImmutableList.of(resultFile));
        Assert.assertEquals(1000 * 100, result);
    }

    @Test
    public void testDiffNumEntriesFileMerge() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        List<Integer> sizeList = ImmutableList.of(200, 500, 20, 50, 300,
                                                  250, 10, 33, 900, 89, 20);
        List<String> inputs = new ArrayList<>();

        for (int j = 0; j < sizeList.size(); j++) {
            String file = StoreTestUtil.availablePathById(j + 10);
            inputs.add(file);
            try (KvEntryFileWriter builder = new HgkvDirBuilderImpl(config,
                                                                    file)) {
                for (int i = 0; i < sizeList.get(j); i++) {
                    byte[] keyBytes = StoreTestUtil.intToByteArray(i);
                    byte[] valueBytes = StoreTestUtil.intToByteArray(1);
                    Pointer key = new InlinePointer(keyBytes);
                    Pointer value = new InlinePointer(valueBytes);
                    KvEntry entry = new DefaultKvEntry(key, value);
                    builder.write(entry);
                }
            }
        }

        List<String> outputs = ImmutableList.of(
                               StoreTestUtil.availablePathById(0),
                               StoreTestUtil.availablePathById(1),
                               StoreTestUtil.availablePathById(2),
                               StoreTestUtil.availablePathById(3));
        Sorter sorter = SorterTestUtil.createSorter(config);
        sorter.mergeInputs(inputs, new KvOuterSortFlusher(), outputs, false);

        int total = sizeList.stream().mapToInt(i -> i).sum();
        int mergeTotal = 0;
        for (String output : outputs) {
            mergeTotal += HgkvDirImpl.open(output).numEntries();
        }
        Assert.assertEquals(total, mergeTotal);
    }

    private static RandomAccessInput sortBuffer(Sorter sorter,
                                                RandomAccessInput input)
                                                throws Exception {
        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        InnerSortFlusher flusher = new CombineKvInnerSortFlusher(output,
                                                                 combiner);
        sorter.sortBuffer(input, flusher, false);
        return EntriesUtil.inputFromOutput(output);
    }

    private static void mergeBuffers(Sorter sorter,
                                     List<RandomAccessInput> buffers,
                                     String output) throws Exception {
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        OuterSortFlusher flusher = new CombineKvOuterSortFlusher(combiner);
        sorter.mergeBuffers(buffers, flusher, output, false);
    }

    private static void mergeFiles(Sorter sorter, List<String> files,
                                   List<String> outputs) throws Exception {
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        OuterSortFlusher flusher = new CombineKvOuterSortFlusher(combiner);
        sorter.mergeInputs(files, flusher, outputs, false);
    }

    private static long sumOfEntryValue(Sorter sorter, List<String> files)
                                        throws Exception {
        long entrySize = 0L;
        for (String file : files) {
            HgkvDir dir = HgkvDirImpl.open(file);
            entrySize += dir.numEntries();
        }
        LOG.info("Finally kvEntry size: {}", entrySize);

        try (PeekableIterator<KvEntry> iterator = sorter.iterator(files,
                                                                  false)) {
            long result = 0;
            while (iterator.hasNext()) {
                KvEntry next = iterator.next();
                result += StoreTestUtil.dataFromPointer(next.value());
            }
            return result;
        }
    }

    private static void assertFileOrder(Sorter sorter, List<String> files)
                                        throws Exception {
        KvEntry last = null;
        try (PeekableIterator<KvEntry> iterator =
                                       sorter.iterator(files, false)) {
            while (iterator.hasNext()) {
                KvEntry next = iterator.next();
                if (last == null) {
                    last = iterator.next();
                    continue;
                }
                Assert.assertLte(0, last.key().compareTo(next.key()));
            }
        }
    }
}
