/*
 * Copyright 2017 HugeGraph Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.baidu.hugegraph.computer.core.sort.sorter;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.baidu.hugegraph.computer.core.combiner.IntValueSumCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.io.BytesInput;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterTestUtil;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.store.EntryIterator;
import com.baidu.hugegraph.computer.core.store.KvEntryFileReader;
import com.baidu.hugegraph.computer.core.store.StoreTestUtil;
import com.baidu.hugegraph.computer.core.store.buffer.KvEntriesInput;
import com.baidu.hugegraph.computer.core.store.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDir;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDirImpl;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.reader.HgkvDirReaderImpl;
import com.baidu.hugegraph.computer.core.store.file.select.DisperseEvenlySelector;
import com.baidu.hugegraph.computer.core.store.file.select.InputFilesSelector;
import com.baidu.hugegraph.computer.core.util.FileUtil;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.iterator.CIter;
import com.baidu.hugegraph.testutil.Assert;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;

public class SorterTest {

    @Before
    public void setup() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @After
    public void teardown() throws IOException {
        FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
    }

    @Test
    public void testSortKvBuffer() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
                ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
                ComputerOptions.HGKV_MERGE_FILES_NUM, "3"
        );
        List<Integer> map = ImmutableList.of(2, 3,
                                             1, 23,
                                             6, 2,
                                             5, 9,
                                             2, 2,
                                             6, 1,
                                             1, 20);
        BytesInput input = SorterTestUtil.inputFromKvMap(map);
        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);

        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        sorter.sortBuffer(input,
                          new CombineKvInnerSortFlusher(output, combiner),
                          false);

        BytesInput resultInput = EntriesUtil.inputFromOutput(output);
        KvEntriesInput iter = new KvEntriesInput(resultInput);
        SorterTestUtil.assertKvEntry(iter.next(), 1, 43);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 5);
        SorterTestUtil.assertKvEntry(iter.next(), 5, 9);
        SorterTestUtil.assertKvEntry(iter.next(), 6, 3);
        iter.close();
    }

    @Test
    public void testSortKvBuffers() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
                ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
                ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        List<Integer> map1 = ImmutableList.of(2, 3,
                                              2, 1,
                                              5, 2,
                                              6, 9,
                                              6, 2);
        List<Integer> map2 = ImmutableList.of(1, 3,
                                              1, 1,
                                              3, 2,
                                              6, 9,
                                              8, 2);
        String path = StoreTestUtil.availablePathById("1");

        // Merge 4 sorted input
        List<RandomAccessInput> inputs = ImmutableList.of(
                                SorterTestUtil.inputFromKvMap(map1),
                                SorterTestUtil.inputFromKvMap(map2),
                                SorterTestUtil.inputFromKvMap(map1),
                                SorterTestUtil.inputFromKvMap(map2));
        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        sorter.mergeBuffers(inputs, new CombineKvOuterSortFlusher(combiner),
                            path, false);

        // Assert merge result from target hgkvDir
        KvEntryFileReader reader = new HgkvDirReaderImpl(path, false);
        EntryIterator iter = reader.iterator();
        SorterTestUtil.assertKvEntry(iter.next(), 1, 8);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 8);
        SorterTestUtil.assertKvEntry(iter.next(), 3, 4);
        SorterTestUtil.assertKvEntry(iter.next(), 5, 4);
        SorterTestUtil.assertKvEntry(iter.next(), 6, 40);
        SorterTestUtil.assertKvEntry(iter.next(), 8, 4);
        Assert.assertFalse(iter.hasNext());
    }

    @Test
    public void testMergeKvInputs() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
                ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
                ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        this.testMergeKvInputs(config);

        config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
                ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
                ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "true"
        );
        this.testMergeKvInputs(config);
    }

    private void testMergeKvInputs(Config config) throws Exception {
        List<Integer> map1 = ImmutableList.of(2, 3,
                                              2, 1,
                                              5, 2,
                                              6, 9,
                                              6, 2);
        List<Integer> map2 = ImmutableList.of(1, 3,
                                              1, 2,
                                              3, 2);

        // Input hgkvDirs
        String file1Name = StoreTestUtil.availablePathById("1");
        String file2Name = StoreTestUtil.availablePathById("2");
        String file3Name = StoreTestUtil.availablePathById("3");
        String file4Name = StoreTestUtil.availablePathById("4");
        String file5Name = StoreTestUtil.availablePathById("5");
        String file6Name = StoreTestUtil.availablePathById("6");
        String file7Name = StoreTestUtil.availablePathById("7");
        String file8Name = StoreTestUtil.availablePathById("8");
        String file9Name = StoreTestUtil.availablePathById("9");
        String file10Name = StoreTestUtil.availablePathById("10");

        List<String> inputs = Lists.newArrayList(file1Name, file2Name,
                                                 file3Name, file4Name,
                                                 file5Name, file6Name,
                                                 file7Name, file8Name,
                                                 file9Name, file10Name);
        // Output hgkvDirs
        String output1 = StoreTestUtil.availablePathById("20");
        String output2 = StoreTestUtil.availablePathById("21");
        List<String> outputs = ImmutableList.of(output1, output2);

        for (int i = 0; i < inputs.size(); i++) {
            List<Integer> map;
            if ((i & 1) == 0) {
                map = map1;
            } else {
                map = map2;
            }
            if (config.get(ComputerOptions.TRANSPORT_RECV_FILE_MODE)) {
                StoreTestUtil.bufferFileFromKvMap(map, inputs.get(i));
            } else {
                StoreTestUtil.hgkvDirFromKvMap(config, map, inputs.get(i));
            }
        }

        // Merge file
        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        sorter.mergeInputs(inputs, new CombineKvOuterSortFlusher(combiner),
                           outputs, false);

        // Assert sort result
        List<Integer> result = ImmutableList.of(1, 25,
                                                2, 20,
                                                3, 10,
                                                5, 10,
                                                6, 55);
        Iterator<Integer> resultIter = result.iterator();
        Iterator<KvEntry> iterator = sorter.iterator(outputs, false);
        KvEntry last = iterator.next();
        int value = StoreTestUtil.dataFromPointer(last.value());
        while (true) {
            KvEntry current = null;
            if (iterator.hasNext()) {
                current = iterator.next();
                if (last.compareTo(current) == 0) {
                    value += StoreTestUtil.dataFromPointer(current.value());
                    continue;
                }
            }

            Assert.assertEquals(StoreTestUtil.dataFromPointer(last.key()),
                                resultIter.next());
            Assert.assertEquals(value, resultIter.next());

            if (current == null) {
                break;
            }

            last = current;
            value = StoreTestUtil.dataFromPointer(last.value());
        }
        Assert.assertFalse(resultIter.hasNext());

        FileUtil.deleteFilesQuietly(inputs);
        FileUtil.deleteFilesQuietly(outputs);
    }

    private BytesInput sortedSubKvBuffer(Config config) throws Exception {
        List<Integer> kv1 = ImmutableList.of(3,
                                             2, 1,
                                             4, 1);
        List<Integer> kv2 = ImmutableList.of(1,
                                             3, 1,
                                             5, 1);
        List<Integer> kv3 = ImmutableList.of(2,
                                             8, 1,
                                             9, 1);
        List<Integer> kv4 = ImmutableList.of(3,
                                             2, 1,
                                             3, 1);
        List<Integer> kv5 = ImmutableList.of(2,
                                             5, 1,
                                             8, 1);
        List<List<Integer>> data = ImmutableList.of(kv1, kv2, kv3, kv4, kv5);

        BytesInput input = SorterTestUtil.inputFromSubKvMap(data);
        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        int flushThreshold = config.get(
                             ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
        InnerSortFlusher flusher = new CombineSubKvInnerSortFlusher(
                                       output, combiner, flushThreshold);

        Sorter sorter = SorterTestUtil.createSorter(config);
        sorter.sortBuffer(input, flusher, true);

        return EntriesUtil.inputFromOutput(output);
    }

    @Test
    public void testSortSubKvBuffer() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
            ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2"
        );

        /*
         * Assert result
         * key 1 subKv 3 1, 5 1
         * key 2 subKv 5 1, 8 2
         * key 2 subKv 9 1
         * key 3 subKv 2 2, 3 1
         * key 3 subKv 4 1
         */
        BytesInput input = this.sortedSubKvBuffer(config);
        EntryIterator iter = new KvEntriesInput(input, true);
        SorterTestUtil.assertSubKvByKv(iter.next(), 1, 3, 1, 5, 1);
        SorterTestUtil.assertSubKvByKv(iter.next(), 2, 5, 1, 8, 2);
        SorterTestUtil.assertSubKvByKv(iter.next(), 2, 9, 1);
        SorterTestUtil.assertSubKvByKv(iter.next(), 3, 2, 2, 3, 1);
        SorterTestUtil.assertSubKvByKv(iter.next(), 3, 4, 1);
        iter.close();
    }

    @Test
    public void testSortSubKvBuffers() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        int flushThreshold = config.get(
                             ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);

        BytesInput i1 = this.sortedSubKvBuffer(config);
        BytesInput i2 = this.sortedSubKvBuffer(config);
        BytesInput i3 = this.sortedSubKvBuffer(config);
        List<RandomAccessInput> buffers = ImmutableList.of(i1, i2, i3);

        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new IntValueSumCombiner());
        OuterSortFlusher flusher = new CombineSubKvOuterSortFlusher(
                                       combiner, flushThreshold);
        flusher.sources(buffers.size());

        String outputFile = StoreTestUtil.availablePathById("1");
        sorter.mergeBuffers(buffers, flusher, outputFile, true);

        /*
         * Assert result
         * key 1 subKv 3 3, 5 3
         * key 2 subKv 5 3, 8 6
         * key 2 subKv 9 3
         * key 3 subKv 2 6, 3 3
         * key 3 subKv 4 3
         */
        ImmutableList<String> outputs = ImmutableList.of(outputFile);
        Iterator<KvEntry> kvIter = sorter.iterator(outputs, true);
        SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 3, 3, 5, 3);
        SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 5, 3, 8, 6);
        SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 9, 3);
        SorterTestUtil.assertSubKvByKv(kvIter.next(), 3, 2, 6, 3, 3);
        SorterTestUtil.assertSubKvByKv(kvIter.next(), 3, 4, 3);

        // Assert file properties
        HgkvDir dir = HgkvDirImpl.open(outputFile);
        Assert.assertEquals(5, dir.numEntries());
        Assert.assertEquals(8, dir.numSubEntries());
    }

    @Test
    public void testMergeSubKvFiles() throws Exception {
        Config config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
        this.testMergeSubKvFiles(config);

        config = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "true"
        );
        this.testMergeSubKvFiles(config);
    }

    private void testMergeSubKvFiles(Config config) throws Exception {
        int flushThreshold = config.get(
                ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);

        List<Integer> kv1 = ImmutableList.of(1,
                                             2, 1,
                                             4, 1);
        List<Integer> kv2 = ImmutableList.of(4,
                                             2, 1,
                                             3, 1);
        List<Integer> kv3 = ImmutableList.of(4,
                                             6, 1,
                                             8, 1);
        List<Integer> kv4 = ImmutableList.of(1,
                                             1, 1,
                                             2, 1);
        List<Integer> kv5 = ImmutableList.of(1,
                                             5, 1,
                                             7, 1);
        List<Integer> kv6 = ImmutableList.of(2,
                                             2, 1,
                                             5, 1);

        List<List<Integer>> data1 = ImmutableList.of(kv1, kv2, kv3);
        List<List<Integer>> data2 = ImmutableList.of(kv4, kv5, kv6);
        List<List<Integer>> data3 = ImmutableList.of(kv4, kv1, kv3);
        List<List<List<Integer>>> datas = ImmutableList.of(data1, data2, data3);

        String input1 = StoreTestUtil.availablePathById(1);
        String input2 = StoreTestUtil.availablePathById(2);
        String input3 = StoreTestUtil.availablePathById(3);
        String output = StoreTestUtil.availablePathById(0);

        List<String> inputs = ImmutableList.of(input1, input2, input3);
        List<String> outputs = ImmutableList.of(output);

        boolean useBufferFile = config.get(
                ComputerOptions.TRANSPORT_RECV_FILE_MODE);
        for (int i = 0; i < inputs.size(); i++) {
            String input = inputs.get(i);
            List<List<Integer>> data = datas.get(i);
            if (useBufferFile) {
                StoreTestUtil.bufferFileFromSubKvMap(data, input);
            } else {
                StoreTestUtil.hgkvDirFromSubKvMap(config, data, input);
            }
        }

        Sorter sorter = SorterTestUtil.createSorter(config);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                IntValue::new,
                new IntValueSumCombiner());
        OuterSortFlusher flusher = new CombineSubKvOuterSortFlusher(
                combiner, flushThreshold);
        flusher.sources(inputs.size());
        sorter.mergeInputs(inputs, flusher, outputs, true);

        /* Assert result
         * key 1 subKv 1 2 2 4
         * key 1 subKv 4 2 5 1
         * key 1 subKv 7 1
         * key 2 subKv 2 1 5 1
         * key 4 subKv 2 1 3 1
         * key 4 subKv 6 2 8 2
         */
        try (CIter<KvEntry> kvIter = sorter.iterator(outputs, true)) {
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 1, 2, 2, 4);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 4, 2, 5, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 7, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 2, 1, 5, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 2, 1, 3, 1);
            SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 6, 2, 8, 2);
        }

        FileUtil.deleteFilesQuietly(inputs);
        FileUtil.deleteFilesQuietly(outputs);
    }

    @Test
    public void testExceptionCaseForSelector() {
        // Parameter inputs size < outputs size
        String input1 = StoreTestUtil.availablePathById("1");
        String input2 = StoreTestUtil.availablePathById("2");
        List<String> inputs = ImmutableList.of(input1, input2);

        String output1 = StoreTestUtil.availablePathById("3");
        String output2 = StoreTestUtil.availablePathById("4");
        String output3 = StoreTestUtil.availablePathById("5");
        List<String> outputs = ImmutableList.of(output1, output2, output3);

        InputFilesSelector selector = new DisperseEvenlySelector();

        Assert.assertThrows(IllegalArgumentException.class, () -> {
            selector.selectedByHgkvFile(inputs, outputs);
        }, e -> {
            String errorMsg = "inputs size of InputFilesSelector must be >= " +
                              "outputs size";
            Assert.assertContains(errorMsg, e.getMessage());
        });
    }
}
