blob: fcd359c099e986cd8dd4e8c096f460a691a2fc66 [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.computer.core.sort.sorter;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.baidu.hugegraph.computer.core.combiner.IntValueSumCombiner;
import com.baidu.hugegraph.computer.core.combiner.PointerCombiner;
import com.baidu.hugegraph.computer.core.common.Constants;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.computer.core.graph.value.IntValue;
import com.baidu.hugegraph.computer.core.io.BytesInput;
import com.baidu.hugegraph.computer.core.io.BytesOutput;
import com.baidu.hugegraph.computer.core.io.IOFactory;
import com.baidu.hugegraph.computer.core.io.RandomAccessInput;
import com.baidu.hugegraph.computer.core.sort.Sorter;
import com.baidu.hugegraph.computer.core.sort.SorterTestUtil;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineKvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvInnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.CombineSubKvOuterSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.InnerSortFlusher;
import com.baidu.hugegraph.computer.core.sort.flusher.OuterSortFlusher;
import com.baidu.hugegraph.computer.core.store.EntryIterator;
import com.baidu.hugegraph.computer.core.store.KvEntryFileReader;
import com.baidu.hugegraph.computer.core.store.StoreTestUtil;
import com.baidu.hugegraph.computer.core.store.buffer.KvEntriesInput;
import com.baidu.hugegraph.computer.core.store.entry.EntriesUtil;
import com.baidu.hugegraph.computer.core.store.entry.KvEntry;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDir;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.HgkvDirImpl;
import com.baidu.hugegraph.computer.core.store.file.hgkvfile.reader.HgkvDirReaderImpl;
import com.baidu.hugegraph.computer.core.store.file.select.DisperseEvenlySelector;
import com.baidu.hugegraph.computer.core.store.file.select.InputFilesSelector;
import com.baidu.hugegraph.computer.core.util.FileUtil;
import com.baidu.hugegraph.computer.suite.unit.UnitTestBase;
import com.baidu.hugegraph.iterator.CIter;
import com.baidu.hugegraph.testutil.Assert;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
public class SorterTest {
@Before
public void setup() throws IOException {
FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
}
@After
public void teardown() throws IOException {
FileUtils.deleteDirectory(new File(StoreTestUtil.FILE_DIR));
}
@Test
public void testSortKvBuffer() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
ComputerOptions.HGKV_MERGE_FILES_NUM, "3"
);
List<Integer> map = ImmutableList.of(2, 3,
1, 23,
6, 2,
5, 9,
2, 2,
6, 1,
1, 20);
BytesInput input = SorterTestUtil.inputFromKvMap(map);
BytesOutput output = IOFactory.createBytesOutput(
Constants.SMALL_BUF_SIZE);
Sorter sorter = SorterTestUtil.createSorter(config);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
sorter.sortBuffer(input,
new CombineKvInnerSortFlusher(output, combiner),
false);
BytesInput resultInput = EntriesUtil.inputFromOutput(output);
KvEntriesInput iter = new KvEntriesInput(resultInput);
SorterTestUtil.assertKvEntry(iter.next(), 1, 43);
SorterTestUtil.assertKvEntry(iter.next(), 2, 5);
SorterTestUtil.assertKvEntry(iter.next(), 5, 9);
SorterTestUtil.assertKvEntry(iter.next(), 6, 3);
iter.close();
}
@Test
public void testSortKvBuffers() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
List<Integer> map1 = ImmutableList.of(2, 3,
2, 1,
5, 2,
6, 9,
6, 2);
List<Integer> map2 = ImmutableList.of(1, 3,
1, 1,
3, 2,
6, 9,
8, 2);
String path = StoreTestUtil.availablePathById("1");
// Merge 4 sorted input
List<RandomAccessInput> inputs = ImmutableList.of(
SorterTestUtil.inputFromKvMap(map1),
SorterTestUtil.inputFromKvMap(map2),
SorterTestUtil.inputFromKvMap(map1),
SorterTestUtil.inputFromKvMap(map2));
Sorter sorter = SorterTestUtil.createSorter(config);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
sorter.mergeBuffers(inputs, new CombineKvOuterSortFlusher(combiner),
path, false);
// Assert merge result from target hgkvDir
KvEntryFileReader reader = new HgkvDirReaderImpl(path, false);
EntryIterator iter = reader.iterator();
SorterTestUtil.assertKvEntry(iter.next(), 1, 8);
SorterTestUtil.assertKvEntry(iter.next(), 2, 8);
SorterTestUtil.assertKvEntry(iter.next(), 3, 4);
SorterTestUtil.assertKvEntry(iter.next(), 5, 4);
SorterTestUtil.assertKvEntry(iter.next(), 6, 40);
SorterTestUtil.assertKvEntry(iter.next(), 8, 4);
Assert.assertFalse(iter.hasNext());
}
@Test
public void testMergeKvInputs() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
this.testMergeKvInputs(config);
config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.HGKV_MAX_FILE_SIZE, "32",
ComputerOptions.HGKV_DATABLOCK_SIZE, "16",
ComputerOptions.HGKV_MERGE_FILES_NUM, "3",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "true"
);
this.testMergeKvInputs(config);
}
private void testMergeKvInputs(Config config) throws Exception {
List<Integer> map1 = ImmutableList.of(2, 3,
2, 1,
5, 2,
6, 9,
6, 2);
List<Integer> map2 = ImmutableList.of(1, 3,
1, 2,
3, 2);
// Input hgkvDirs
String file1Name = StoreTestUtil.availablePathById("1");
String file2Name = StoreTestUtil.availablePathById("2");
String file3Name = StoreTestUtil.availablePathById("3");
String file4Name = StoreTestUtil.availablePathById("4");
String file5Name = StoreTestUtil.availablePathById("5");
String file6Name = StoreTestUtil.availablePathById("6");
String file7Name = StoreTestUtil.availablePathById("7");
String file8Name = StoreTestUtil.availablePathById("8");
String file9Name = StoreTestUtil.availablePathById("9");
String file10Name = StoreTestUtil.availablePathById("10");
List<String> inputs = Lists.newArrayList(file1Name, file2Name,
file3Name, file4Name,
file5Name, file6Name,
file7Name, file8Name,
file9Name, file10Name);
// Output hgkvDirs
String output1 = StoreTestUtil.availablePathById("20");
String output2 = StoreTestUtil.availablePathById("21");
List<String> outputs = ImmutableList.of(output1, output2);
for (int i = 0; i < inputs.size(); i++) {
List<Integer> map;
if ((i & 1) == 0) {
map = map1;
} else {
map = map2;
}
if (config.get(ComputerOptions.TRANSPORT_RECV_FILE_MODE)) {
StoreTestUtil.bufferFileFromKvMap(map, inputs.get(i));
} else {
StoreTestUtil.hgkvDirFromKvMap(config, map, inputs.get(i));
}
}
// Merge file
Sorter sorter = SorterTestUtil.createSorter(config);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
sorter.mergeInputs(inputs, new CombineKvOuterSortFlusher(combiner),
outputs, false);
// Assert sort result
List<Integer> result = ImmutableList.of(1, 25,
2, 20,
3, 10,
5, 10,
6, 55);
Iterator<Integer> resultIter = result.iterator();
Iterator<KvEntry> iterator = sorter.iterator(outputs, false);
KvEntry last = iterator.next();
int value = StoreTestUtil.dataFromPointer(last.value());
while (true) {
KvEntry current = null;
if (iterator.hasNext()) {
current = iterator.next();
if (last.compareTo(current) == 0) {
value += StoreTestUtil.dataFromPointer(current.value());
continue;
}
}
Assert.assertEquals(StoreTestUtil.dataFromPointer(last.key()),
resultIter.next());
Assert.assertEquals(value, resultIter.next());
if (current == null) {
break;
}
last = current;
value = StoreTestUtil.dataFromPointer(last.value());
}
Assert.assertFalse(resultIter.hasNext());
FileUtil.deleteFilesQuietly(inputs);
FileUtil.deleteFilesQuietly(outputs);
}
private BytesInput sortedSubKvBuffer(Config config) throws Exception {
List<Integer> kv1 = ImmutableList.of(3,
2, 1,
4, 1);
List<Integer> kv2 = ImmutableList.of(1,
3, 1,
5, 1);
List<Integer> kv3 = ImmutableList.of(2,
8, 1,
9, 1);
List<Integer> kv4 = ImmutableList.of(3,
2, 1,
3, 1);
List<Integer> kv5 = ImmutableList.of(2,
5, 1,
8, 1);
List<List<Integer>> data = ImmutableList.of(kv1, kv2, kv3, kv4, kv5);
BytesInput input = SorterTestUtil.inputFromSubKvMap(data);
BytesOutput output = IOFactory.createBytesOutput(
Constants.SMALL_BUF_SIZE);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
int flushThreshold = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
InnerSortFlusher flusher = new CombineSubKvInnerSortFlusher(
output, combiner, flushThreshold);
Sorter sorter = SorterTestUtil.createSorter(config);
sorter.sortBuffer(input, flusher, true);
return EntriesUtil.inputFromOutput(output);
}
@Test
public void testSortSubKvBuffer() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2"
);
/*
* Assert result
* key 1 subKv 3 1, 5 1
* key 2 subKv 5 1, 8 2
* key 2 subKv 9 1
* key 3 subKv 2 2, 3 1
* key 3 subKv 4 1
*/
BytesInput input = this.sortedSubKvBuffer(config);
EntryIterator iter = new KvEntriesInput(input, true);
SorterTestUtil.assertSubKvByKv(iter.next(), 1, 3, 1, 5, 1);
SorterTestUtil.assertSubKvByKv(iter.next(), 2, 5, 1, 8, 2);
SorterTestUtil.assertSubKvByKv(iter.next(), 2, 9, 1);
SorterTestUtil.assertSubKvByKv(iter.next(), 3, 2, 2, 3, 1);
SorterTestUtil.assertSubKvByKv(iter.next(), 3, 4, 1);
iter.close();
}
@Test
public void testSortSubKvBuffers() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
int flushThreshold = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
BytesInput i1 = this.sortedSubKvBuffer(config);
BytesInput i2 = this.sortedSubKvBuffer(config);
BytesInput i3 = this.sortedSubKvBuffer(config);
List<RandomAccessInput> buffers = ImmutableList.of(i1, i2, i3);
Sorter sorter = SorterTestUtil.createSorter(config);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
OuterSortFlusher flusher = new CombineSubKvOuterSortFlusher(
combiner, flushThreshold);
flusher.sources(buffers.size());
String outputFile = StoreTestUtil.availablePathById("1");
sorter.mergeBuffers(buffers, flusher, outputFile, true);
/*
* Assert result
* key 1 subKv 3 3, 5 3
* key 2 subKv 5 3, 8 6
* key 2 subKv 9 3
* key 3 subKv 2 6, 3 3
* key 3 subKv 4 3
*/
ImmutableList<String> outputs = ImmutableList.of(outputFile);
Iterator<KvEntry> kvIter = sorter.iterator(outputs, true);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 3, 3, 5, 3);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 5, 3, 8, 6);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 9, 3);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 3, 2, 6, 3, 3);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 3, 4, 3);
// Assert file properties
HgkvDir dir = HgkvDirImpl.open(outputFile);
Assert.assertEquals(5, dir.numEntries());
Assert.assertEquals(8, dir.numSubEntries());
}
@Test
public void testMergeSubKvFiles() throws Exception {
Config config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "false"
);
this.testMergeSubKvFiles(config);
config = UnitTestBase.updateWithRequiredOptions(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX, "2",
ComputerOptions.TRANSPORT_RECV_FILE_MODE, "true"
);
this.testMergeSubKvFiles(config);
}
private void testMergeSubKvFiles(Config config) throws Exception {
int flushThreshold = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
List<Integer> kv1 = ImmutableList.of(1,
2, 1,
4, 1);
List<Integer> kv2 = ImmutableList.of(4,
2, 1,
3, 1);
List<Integer> kv3 = ImmutableList.of(4,
6, 1,
8, 1);
List<Integer> kv4 = ImmutableList.of(1,
1, 1,
2, 1);
List<Integer> kv5 = ImmutableList.of(1,
5, 1,
7, 1);
List<Integer> kv6 = ImmutableList.of(2,
2, 1,
5, 1);
List<List<Integer>> data1 = ImmutableList.of(kv1, kv2, kv3);
List<List<Integer>> data2 = ImmutableList.of(kv4, kv5, kv6);
List<List<Integer>> data3 = ImmutableList.of(kv4, kv1, kv3);
List<List<List<Integer>>> datas = ImmutableList.of(data1, data2, data3);
String input1 = StoreTestUtil.availablePathById(1);
String input2 = StoreTestUtil.availablePathById(2);
String input3 = StoreTestUtil.availablePathById(3);
String output = StoreTestUtil.availablePathById(0);
List<String> inputs = ImmutableList.of(input1, input2, input3);
List<String> outputs = ImmutableList.of(output);
boolean useBufferFile = config.get(
ComputerOptions.TRANSPORT_RECV_FILE_MODE);
for (int i = 0; i < inputs.size(); i++) {
String input = inputs.get(i);
List<List<Integer>> data = datas.get(i);
if (useBufferFile) {
StoreTestUtil.bufferFileFromSubKvMap(data, input);
} else {
StoreTestUtil.hgkvDirFromSubKvMap(config, data, input);
}
}
Sorter sorter = SorterTestUtil.createSorter(config);
PointerCombiner combiner = SorterTestUtil.createPointerCombiner(
IntValue::new,
new IntValueSumCombiner());
OuterSortFlusher flusher = new CombineSubKvOuterSortFlusher(
combiner, flushThreshold);
flusher.sources(inputs.size());
sorter.mergeInputs(inputs, flusher, outputs, true);
/* Assert result
* key 1 subKv 1 2 2 4
* key 1 subKv 4 2 5 1
* key 1 subKv 7 1
* key 2 subKv 2 1 5 1
* key 4 subKv 2 1 3 1
* key 4 subKv 6 2 8 2
*/
try (CIter<KvEntry> kvIter = sorter.iterator(outputs, true)) {
SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 1, 2, 2, 4);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 4, 2, 5, 1);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 1, 7, 1);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 2, 2, 1, 5, 1);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 2, 1, 3, 1);
SorterTestUtil.assertSubKvByKv(kvIter.next(), 4, 6, 2, 8, 2);
}
FileUtil.deleteFilesQuietly(inputs);
FileUtil.deleteFilesQuietly(outputs);
}
@Test
public void testExceptionCaseForSelector() {
// Parameter inputs size < outputs size
String input1 = StoreTestUtil.availablePathById("1");
String input2 = StoreTestUtil.availablePathById("2");
List<String> inputs = ImmutableList.of(input1, input2);
String output1 = StoreTestUtil.availablePathById("3");
String output2 = StoreTestUtil.availablePathById("4");
String output3 = StoreTestUtil.availablePathById("5");
List<String> outputs = ImmutableList.of(output1, output2, output3);
InputFilesSelector selector = new DisperseEvenlySelector();
Assert.assertThrows(IllegalArgumentException.class, () -> {
selector.selectedByHgkvFile(inputs, outputs);
}, e -> {
String errorMsg = "inputs size of InputFilesSelector must be >= " +
"outputs size";
Assert.assertContains(errorMsg, e.getMessage());
});
}
}