/*
 * Copyright 2017 HugeGraph Authors
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */

package com.baidu.hugegraph.computer.core.sort.sorter;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.junit.BeforeClass;
import org.junit.Test;

import com.baidu.hugegraph.computer.core.combiner.OverwriteCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.io.BytesInput;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterTestUtil;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.store.EntryIterator;
import com.baidu.hugegraph.computer.core.store.StoreTestUtil;
import com.baidu.hugegraph.computer.core.store.buffer.KvEntriesInput;
import com.baidu.hugegraph.computer.core.store.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.testutil.Assert;
import com.google.common.collect.ImmutableList;

public class FlusherTest {

    private static Config CONFIG;

    @BeforeClass
    public static void init() {
        CONFIG = UnitTestBase.updateWithRequiredOptions(
                ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
        );
    }

    @Test
    public void testKvInnerSortFlusher() throws Exception {
        List<Integer> map = ImmutableList.of(2, 1,
                                             3, 1,
                                             2, 1,
                                             4, 1);
        BytesInput input = SorterTestUtil.inputFromKvMap(map);

        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);
        sorter.sortBuffer(input, new KvInnerSortFlusher(output), false);

        BytesInput result = EntriesUtil.inputFromOutput(output);
        EntryIterator iter = new KvEntriesInput(result);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 4, 1);
        iter.close();
    }

    @Test
    public void testKvOuterSortFlusher() throws Exception {
        List<Integer> map1 = ImmutableList.of(2, 1,
                                              2, 1,
                                              3, 1,
                                              4, 1);
        List<Integer> map2 = ImmutableList.of(1, 1,
                                              3, 1,
                                              6, 1);
        BytesInput input1 = SorterTestUtil.inputFromKvMap(map1);
        BytesInput input2 = SorterTestUtil.inputFromKvMap(map2);
        List<RandomAccessInput> inputs = ImmutableList.of(input1, input2);

        String resultFile = StoreTestUtil.availablePathById("1");
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);
        sorter.mergeBuffers(inputs, new KvOuterSortFlusher(), resultFile,
                            false);

        ImmutableList<String> outputs = ImmutableList.of(resultFile);
        Iterator<KvEntry> iter = sorter.iterator(outputs, false);
        SorterTestUtil.assertKvEntry(iter.next(), 1, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 4, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 6, 1);
    }

    @Test
    public void testExceptionCaseForFlusher() {
        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        InnerSortFlusher flusher = new KvInnerSortFlusher(output);
        List<KvEntry> entries = new ArrayList<>();

        Assert.assertThrows(IllegalArgumentException.class, () -> {
            flusher.flush(entries.iterator());
        }, e -> {
            String errorMsg = "Parameter entries can't be empty";
            Assert.assertContains(errorMsg, e.getMessage());
        });
    }

    @Test
    public void testOverwriteCombiner() throws Exception {
        List<Integer> data = ImmutableList.of(1, 2,
                                              3, 5,
                                              1, 3,
                                              1, 1,
                                              3, 4);
        BytesInput input = SorterTestUtil.inputFromKvMap(data);

        BytesOutput output = IOFactory.createBytesOutput(
                             Constants.SMALL_BUF_SIZE);
        PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
                                                  IntValue::new,
                                                  new OverwriteCombiner<>());
        InnerSortFlusher flusher = new CombineKvInnerSortFlusher(output,
                                                                 combiner);
        Sorter sorter = SorterTestUtil.createSorter(CONFIG);
        sorter.sortBuffer(input, flusher, false);

        BytesInput result = EntriesUtil.inputFromOutput(output);
        // Assert result
        KvEntriesInput iter = new KvEntriesInput(result);
        SorterTestUtil.assertKvEntry(iter.next(), 1, 1);
        SorterTestUtil.assertKvEntry(iter.next(), 3, 4);
        iter.close();
    }
}
