blob: 1d8436a968a677544cfa2fc8813aae2f2587a745 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.sort.sorter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.junit.BeforeClass;
import org.junit.Test;
import com.baidu.hugegraph.computer.core.combiner.OverwriteCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.io.BytesInput;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterTestUtil;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.KvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.store.EntryIterator;
import com.baidu.hugegraph.computer.core.store.StoreTestUtil;
import com.baidu.hugegraph.computer.core.store.buffer.KvEntriesInput;
import com.baidu.hugegraph.computer.core.store.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.testutil.Assert;
import com.google.common.collect.ImmutableList;
public class FlusherTest {
private static Config CONFIG;
@BeforeClass
public static void init() {
CONFIG = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
}
@Test
public void testKvInnerSortFlusher() throws Exception {
List<Integer> map = ImmutableList.of(2, 1,
3, 1,
2, 1,
4, 1);
BytesInput input = SorterTestUtil.inputFromKvMap(map);
BytesOutput output = IOFactory.createBytesOutput(
Constants.SMALL_BUF_SIZE);
Sorter sorter = SorterTestUtil.createSorter(CONFIG);
sorter.sortBuffer(input, new KvInnerSortFlusher(output), false);
BytesInput result = EntriesUtil.inputFromOutput(output);
EntryIterator iter = new KvEntriesInput(result);
SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
SorterTestUtil.assertKvEntry(iter.next(), 4, 1);
iter.close();
}
@Test
public void testKvOuterSortFlusher() throws Exception {
List<Integer> map1 = ImmutableList.of(2, 1,
2, 1,
3, 1,
4, 1);
List<Integer> map2 = ImmutableList.of(1, 1,
3, 1,
6, 1);
BytesInput input1 = SorterTestUtil.inputFromKvMap(map1);
BytesInput input2 = SorterTestUtil.inputFromKvMap(map2);
List<RandomAccessInput> inputs = ImmutableList.of(input1, input2);
String resultFile = StoreTestUtil.availablePathById("1");
Sorter sorter = SorterTestUtil.createSorter(CONFIG);
sorter.mergeBuffers(inputs, new KvOuterSortFlusher(), resultFile,
false);
ImmutableList<String> outputs = ImmutableList.of(resultFile);
Iterator<KvEntry> iter = sorter.iterator(outputs, false);
SorterTestUtil.assertKvEntry(iter.next(), 1, 1);
SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
SorterTestUtil.assertKvEntry(iter.next(), 2, 1);
SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
SorterTestUtil.assertKvEntry(iter.next(), 3, 1);
SorterTestUtil.assertKvEntry(iter.next(), 4, 1);
SorterTestUtil.assertKvEntry(iter.next(), 6, 1);
}
@Test
public void testExceptionCaseForFlusher() {
BytesOutput output = IOFactory.createBytesOutput(
Constants.SMALL_BUF_SIZE);
InnerSortFlusher flusher = new KvInnerSortFlusher(output);
List<KvEntry> entries = new ArrayList<>();
Assert.assertThrows(IllegalArgumentException.class, () -> {
flusher.flush(entries.iterator());
}, e -> {
String errorMsg = "Parameter entries can't be empty";
Assert.assertContains(errorMsg, e.getMessage());
});
}
@Test
public void testOverwriteCombiner() throws Exception {
List<Integer> data = ImmutableList.of(1, 2,
3, 5,
1, 3,
1, 1,
3, 4);
BytesInput input = SorterTestUtil.inputFromKvMap(data);
BytesOutput output = IOFactory.createBytesOutput(
Constants.SMALL_BUF_SIZE);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new OverwriteCombiner<>());
InnerSortFlusher flusher = new CombineKvInnerSortFlusher(output,
combiner);
Sorter sorter = SorterTestUtil.createSorter(CONFIG);
sorter.sortBuffer(input, flusher, false);
BytesInput result = EntriesUtil.inputFromOutput(output);
// Assert result
KvEntriesInput iter = new KvEntriesInput(result);
SorterTestUtil.assertKvEntry(iter.next(), 1, 1);
SorterTestUtil.assertKvEntry(iter.next(), 3, 4);
iter.close();
}
}