blob: c68d59deab4a7ee9bcc9aa12ebc824d8637851d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.tests.unit;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.ComparatorFactories;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.GRandom;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.RecordDesc;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.SortFields;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.generateRandomRecord;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.matchResult;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.prepareData;
import static org.apache.hyracks.tests.unit.AbstractRunGeneratorTest.testUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.DataInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hyracks.api.comm.FrameHelper;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.comm.IFrameReader;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
import org.apache.hyracks.dataflow.std.sort.Algorithm;
import org.apache.hyracks.dataflow.std.sort.ExternalSortRunGenerator;
import org.apache.hyracks.dataflow.std.sort.RunMergingFrameReader;
import org.apache.hyracks.dataflow.std.sort.util.GroupVSizeFrame;
import org.junit.Test;
import junit.extensions.PA;
public class RunMergingFrameReaderTest {
static IBinaryComparator[] Comparators = new IBinaryComparator[] { ComparatorFactories[0].createBinaryComparator(),
ComparatorFactories[1].createBinaryComparator(), };
static class TestFrameReader implements IFrameReader {
private final int pageSize;
private final int numFrames;
private final int minRecordSize;
private final int maxRecordSize;
private TreeMap<Integer, String> result = new TreeMap<>();
int maxFrameSize;
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
FrameTupleAppender appender = new FrameTupleAppender();
private Iterator<Map.Entry<Integer, String>> iterator;
private Map.Entry<Integer, String> lastEntry;
TestFrameReader(int pageSize, int numFrames, int minRecordSize, int maxRecordSize) {
this.pageSize = pageSize;
this.numFrames = numFrames;
this.minRecordSize = minRecordSize;
this.maxRecordSize = maxRecordSize;
this.maxFrameSize = pageSize;
}
@Override
public void open() throws HyracksDataException {
result.clear();
int maxTupleSize = prepareSortedData(numFrames * pageSize, minRecordSize, maxRecordSize, null, result);
maxFrameSize = FrameHelper.calcAlignedFrameSizeToStore(0, maxTupleSize, pageSize);
iterator = result.entrySet().iterator();
}
@Override
public boolean nextFrame(IFrame frame) throws HyracksDataException {
if (lastEntry == null && !iterator.hasNext()) {
return false;
}
if (lastEntry == null) {
lastEntry = iterator.next();
}
appender.reset(frame, true);
while (true) {
tb.reset();
tb.addField(IntegerSerializerDeserializer.INSTANCE, lastEntry.getKey());
tb.addField(new UTF8StringSerializerDeserializer(), lastEntry.getValue());
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
break;
} else {
if (iterator.hasNext()) {
lastEntry = iterator.next();
} else {
lastEntry = null;
break;
}
}
}
// printFrame(frame.getBuffer());
return true;
}
@Override
public void close() throws HyracksDataException {
}
}
static int prepareSortedData(int minDataSize, int minRecordSize, int maxRecordSize,
Map<Integer, String> specialData, Map<Integer, String> result) throws HyracksDataException {
ArrayTupleBuilder tb = new ArrayTupleBuilder(RecordDesc.getFieldCount());
int datasize = 0;
int maxtuple = 0;
if (specialData != null) {
for (Map.Entry<Integer, String> entry : specialData.entrySet()) {
tb.reset();
tb.addField(IntegerSerializerDeserializer.INSTANCE, entry.getKey());
tb.addField(new UTF8StringSerializerDeserializer(), entry.getValue());
int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
datasize += size;
if (size > maxtuple) {
maxtuple = size;
}
}
result.putAll(specialData);
}
while (datasize < minDataSize) {
String value = generateRandomRecord(minRecordSize, maxRecordSize);
tb.reset();
int key = GRandom.nextInt(datasize + 1);
if (!result.containsKey(key)) {
tb.addField(IntegerSerializerDeserializer.INSTANCE, key);
tb.addField(new UTF8StringSerializerDeserializer(), value);
int size = tb.getSize() + tb.getFieldEndOffsets().length * 4;
datasize += size;
if (size > maxtuple) {
maxtuple = size;
}
if (datasize < minDataSize) {
result.put(key, value);
}
}
}
return maxtuple;
}
@Test
public void testOnlyOneRunShouldMerge() throws HyracksDataException {
int pageSize = 128;
int numRuns = 1;
int numFramesPerRun = 1;
int minRecordSize = pageSize / 10;
int maxRecordSize = pageSize / 8;
IHyracksTaskContext ctx = testUtils.create(pageSize);
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMapList);
}
@Test
public void testNormalRunMerge() throws HyracksDataException {
int pageSize = 128;
int numRuns = 2;
int numFramesPerRun = 2;
int minRecordSize = pageSize / 10;
int maxRecordSize = pageSize / 8;
IHyracksTaskContext ctx = testUtils.create(pageSize);
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMapList);
}
@Test
public void testNormalRunMergeWithTopK() throws HyracksDataException {
int pageSize = 128;
int numRuns = 2;
int numFramesPerRun = 2;
int minRecordSize = pageSize / 10;
int maxRecordSize = pageSize / 8;
for (int topK = 1; topK < pageSize * numRuns * numFramesPerRun / maxRecordSize / 2; topK++) {
IHyracksTaskContext ctx = testUtils.create(pageSize);
List<Map<Integer, String>> keyValueMapList = new ArrayList<>(numRuns);
List<TestFrameReader> readerList = new ArrayList<>(numRuns);
List<IFrame> frameList = new ArrayList<>(numRuns);
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMapList);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields,
Comparators, null, RecordDesc, topK);
int totoalCount = testMergeSucceedInner(ctx, reader, keyValueMapList);
int newCount = 0;
for (Map<Integer, String> x : keyValueMapList) {
newCount += x.size();
}
assertEquals(topK + newCount, totoalCount);
}
}
private void testMergeSucceed(IHyracksTaskContext ctx, RunMergingFrameReader reader,
List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
testMergeSucceedInner(ctx, reader, keyValueMapList);
assertAllKeyValueIsConsumed(keyValueMapList);
reader.close();
}
private int testMergeSucceedInner(IHyracksTaskContext ctx, RunMergingFrameReader reader,
List<Map<Integer, String>> keyValueMapList) throws HyracksDataException {
IFrame frame = new VSizeFrame(ctx);
reader.open();
int count = 0;
for (int i = 0; i < keyValueMapList.size(); i++) {
keyValueMapList.set(i, new TreeMap<>(keyValueMapList.get(i)));
count += keyValueMapList.get(i).size();
}
while (reader.nextFrame(frame)) {
assertFrameIsSorted(frame, keyValueMapList);
}
return count;
}
@Test
public void testOneLargeRunMerge() throws HyracksDataException {
int pageSize = 64;
int numRuns = 2;
int numFramesPerRun = 1;
int minRecordSize = pageSize / 10;
int maxRecordSize = pageSize / 8;
IHyracksTaskContext ctx = testUtils.create(pageSize);
List<Map<Integer, String>> keyValueMap = new ArrayList<>();
List<TestFrameReader> readerList = new ArrayList<>();
List<IFrame> frameList = new ArrayList<>();
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMap);
minRecordSize = pageSize;
maxRecordSize = pageSize;
numFramesPerRun = 4;
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMap);
minRecordSize = pageSize * 2;
maxRecordSize = pageSize * 2;
numFramesPerRun = 6;
prepareRandomInputRunList(ctx, pageSize, numRuns, numFramesPerRun, minRecordSize, maxRecordSize, readerList,
frameList, keyValueMap);
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, readerList, frameList, SortFields, Comparators,
null, RecordDesc);
testMergeSucceed(ctx, reader, keyValueMap);
}
@Test
public void testRunFileReader() throws HyracksDataException {
int pageSize = 128;
int numRuns = 4;
int numFramesPerRun = 4;
int minRecordSize = pageSize / 10;
int maxRecordSize = pageSize / 2;
IHyracksTaskContext ctx = testUtils.create(pageSize);
ExternalSortRunGenerator runGenerator = new ExternalSortRunGenerator(ctx, SortFields, null, ComparatorFactories,
RecordDesc, Algorithm.MERGE_SORT, numFramesPerRun);
runGenerator.open();
Map<Integer, String> keyValuePair = new HashMap<>();
List<IFrame> frameList = new ArrayList<>();
prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
keyValuePair);
for (IFrame frame : frameList) {
runGenerator.nextFrame(frame.getBuffer());
}
numFramesPerRun = 2;
minRecordSize = pageSize;
maxRecordSize = pageSize;
frameList.clear();
prepareData(ctx, frameList, pageSize * numFramesPerRun * numRuns, minRecordSize, maxRecordSize, null,
keyValuePair);
for (IFrame frame : frameList) {
runGenerator.nextFrame(frame.getBuffer());
}
runGenerator.close();
List<IFrame> inFrame = new ArrayList<>(runGenerator.getRuns().size());
for (GeneratedRunFileReader max : runGenerator.getRuns()) {
inFrame.add(new GroupVSizeFrame(ctx, max.getMaxFrameSize()));
}
// Let each run file reader not delete the run file when it is read and closed.
for (GeneratedRunFileReader run : runGenerator.getRuns()) {
PA.setValue(run, "deleteAfterClose", false);
}
matchResult(ctx, runGenerator.getRuns(), keyValuePair);
List<IFrameReader> runs = new ArrayList<>();
for (GeneratedRunFileReader run : runGenerator.getRuns()) {
runs.add(run);
}
RunMergingFrameReader reader = new RunMergingFrameReader(ctx, runs, inFrame, SortFields, Comparators, null,
RecordDesc);
IFrame outFrame = new VSizeFrame(ctx);
reader.open();
while (reader.nextFrame(outFrame)) {
assertFrameIsSorted(outFrame, Arrays.asList(keyValuePair));
}
reader.close();
assertAllKeyValueIsConsumed(Arrays.asList(keyValuePair));
}
private void assertAllKeyValueIsConsumed(List<Map<Integer, String>> keyValueMapList) {
for (Map<Integer, String> map : keyValueMapList) {
assertTrue(map.isEmpty());
}
}
private void assertFrameIsSorted(IFrame frame, List<Map<Integer, String>> keyValueMapList)
throws HyracksDataException {
FrameTupleAccessor fta = new FrameTupleAccessor(RecordDesc);
ByteBufferInputStream bbis = new ByteBufferInputStream();
DataInputStream di = new DataInputStream(bbis);
fta.reset(frame.getBuffer());
// fta.prettyPrint();
int preKey = Integer.MIN_VALUE;
for (int i = 0; i < fta.getTupleCount(); i++) {
bbis.setByteBuffer(fta.getBuffer(),
fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 0) + fta.getFieldSlotsLength());
int key = (int) RecordDesc.getFields()[0].deserialize(di);
bbis.setByteBuffer(fta.getBuffer(),
fta.getTupleStartOffset(i) + fta.getFieldStartOffset(i, 1) + fta.getFieldSlotsLength());
String value = (String) RecordDesc.getFields()[1].deserialize(di);
boolean found = false;
for (Map<Integer, String> map : keyValueMapList) {
if (map.containsKey(key) && map.get(key).equals(value)) {
found = true;
map.remove(key);
break;
}
}
assertTrue(found);
assertTrue(preKey <= key);
preKey = key;
}
}
static void prepareRandomInputRunList(IHyracksTaskContext ctx, int pageSize, int numRuns, int numFramesPerRun,
int minRecordSize, int maxRecordSize, List<TestFrameReader> readerList, List<IFrame> frameList,
List<Map<Integer, String>> keyValueMap) throws HyracksDataException {
for (int i = 0; i < numRuns; i++) {
readerList.add(new TestFrameReader(pageSize, numFramesPerRun, minRecordSize, maxRecordSize));
frameList.add(new VSizeFrame(ctx, readerList.get(readerList.size() - 1).maxFrameSize));
keyValueMap.add(readerList.get(readerList.size() - 1).result);
}
}
}