blob: fbdfd6438eba2be2c0ec7f8fa265aee9fc3ec3eb [file] [log] [blame]
/*
* Copyright 2009-2012 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.invertedindex.search;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import edu.uci.ics.hyracks.api.context.IHyracksCommonContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.common.api.IndexException;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndex;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedListCursor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeFrameTupleAccessor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.ondisk.FixedSizeTupleReference;
// TODO: The merge procedure is rather confusing regarding cursor positions, hasNext() calls etc.
// Needs an overhaul some time.
public class InvertedListMerger {
protected final MultiComparator invListCmp;
protected SearchResult prevSearchResult;
protected SearchResult newSearchResult;
public InvertedListMerger(IHyracksCommonContext ctx, IInvertedIndex invIndex) {
this.invListCmp = MultiComparator.createIgnoreFieldLength(invIndex.getInvListCmpFactories());
this.prevSearchResult = new SearchResult(invIndex.getInvListTypeTraits(), ctx);
this.newSearchResult = new SearchResult(prevSearchResult);
}
public void merge(ArrayList<IInvertedListCursor> invListCursors, int occurrenceThreshold, int numPrefixLists,
SearchResult searchResult) throws HyracksDataException, IndexException {
Collections.sort(invListCursors);
int numInvLists = invListCursors.size();
SearchResult result = null;
for (int i = 0; i < numInvLists; i++) {
SearchResult swapTemp = prevSearchResult;
prevSearchResult = newSearchResult;
newSearchResult = swapTemp;
newSearchResult.reset();
if (i + 1 != numInvLists) {
// Use temporary search results when not merging last list.
result = newSearchResult;
} else {
// When merging the last list, append results to the final search result.
result = searchResult;
}
IInvertedListCursor invListCursor = invListCursors.get(i);
invListCursor.pinPages();
if (i < numPrefixLists) {
// Merge prefix list.
mergePrefixList(invListCursor, prevSearchResult, result);
} else {
// Merge suffix list.
int numInvListElements = invListCursor.size();
int currentNumResults = prevSearchResult.getNumResults();
// Should we binary search the next list or should we sort-merge it?
if (currentNumResults * Math.log(numInvListElements) < currentNumResults + numInvListElements) {
mergeSuffixListProbe(invListCursor, prevSearchResult, result, i, numInvLists,
occurrenceThreshold);
} else {
mergeSuffixListScan(invListCursor, prevSearchResult, result, i, numInvLists,
occurrenceThreshold);
}
}
invListCursor.unpinPages();
}
}
protected void mergeSuffixListProbe(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
throws HyracksDataException, IndexException {
int prevBufIdx = 0;
int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
int resultTidx = 0;
resultFrameTupleAcc.reset(prevCurrentBuffer);
while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
if (invListCursor.containsKey(resultTuple, invListCmp)) {
count++;
newSearchResult.append(resultTuple, count);
} else {
if (count + numInvLists - invListIx > occurrenceThreshold) {
newSearchResult.append(resultTuple, count);
}
}
resultTidx++;
if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
resultFrameTupleAcc.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
}
protected void mergeSuffixListScan(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
SearchResult newSearchResult, int invListIx, int numInvLists, int occurrenceThreshold)
throws HyracksDataException, IndexException {
int prevBufIdx = 0;
int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
boolean advanceCursor = true;
boolean advancePrevResult = false;
int resultTidx = 0;
resultFrameTupleAcc.reset(prevCurrentBuffer);
int invListTidx = 0;
int invListNumTuples = invListCursor.size();
if (invListCursor.hasNext())
invListCursor.next();
while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
ITupleReference invListTuple = invListCursor.getTuple();
resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
int cmp = invListCmp.compare(invListTuple, resultTuple);
if (cmp == 0) {
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
newSearchResult.append(resultTuple, count);
advanceCursor = true;
advancePrevResult = true;
} else {
if (cmp < 0) {
advanceCursor = true;
advancePrevResult = false;
} else {
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
if (count + numInvLists - invListIx > occurrenceThreshold) {
newSearchResult.append(resultTuple, count);
}
advanceCursor = false;
advancePrevResult = true;
}
}
if (advancePrevResult) {
resultTidx++;
if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
resultFrameTupleAcc.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
if (advanceCursor) {
invListTidx++;
if (invListCursor.hasNext()) {
invListCursor.next();
}
}
}
// append remaining elements from previous result set
while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
if (count + numInvLists - invListIx > occurrenceThreshold) {
newSearchResult.append(resultTuple, count);
}
resultTidx++;
if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
resultFrameTupleAcc.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
}
protected void mergePrefixList(IInvertedListCursor invListCursor, SearchResult prevSearchResult,
SearchResult newSearchResult) throws HyracksDataException, IndexException {
int prevBufIdx = 0;
int maxPrevBufIdx = prevSearchResult.getCurrentBufferIndex();
ByteBuffer prevCurrentBuffer = prevSearchResult.getBuffers().get(0);
FixedSizeFrameTupleAccessor resultFrameTupleAcc = prevSearchResult.getAccessor();
FixedSizeTupleReference resultTuple = prevSearchResult.getTuple();
boolean advanceCursor = true;
boolean advancePrevResult = false;
int resultTidx = 0;
resultFrameTupleAcc.reset(prevCurrentBuffer);
int invListTidx = 0;
int invListNumTuples = invListCursor.size();
if (invListCursor.hasNext())
invListCursor.next();
while (invListTidx < invListNumTuples && resultTidx < resultFrameTupleAcc.getTupleCount()) {
ITupleReference invListTuple = invListCursor.getTuple();
resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
int cmp = invListCmp.compare(invListTuple, resultTuple);
if (cmp == 0) {
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1)) + 1;
newSearchResult.append(resultTuple, count);
advanceCursor = true;
advancePrevResult = true;
} else {
if (cmp < 0) {
int count = 1;
newSearchResult.append(invListTuple, count);
advanceCursor = true;
advancePrevResult = false;
} else {
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
newSearchResult.append(resultTuple, count);
advanceCursor = false;
advancePrevResult = true;
}
}
if (advancePrevResult) {
resultTidx++;
if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
resultFrameTupleAcc.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
if (advanceCursor) {
invListTidx++;
if (invListCursor.hasNext()) {
invListCursor.next();
}
}
}
// append remaining new elements from inverted list
while (invListTidx < invListNumTuples) {
ITupleReference invListTuple = invListCursor.getTuple();
newSearchResult.append(invListTuple, 1);
invListTidx++;
if (invListCursor.hasNext()) {
invListCursor.next();
}
}
// append remaining elements from previous result set
while (resultTidx < resultFrameTupleAcc.getTupleCount()) {
resultTuple.reset(prevCurrentBuffer.array(), resultFrameTupleAcc.getTupleStartOffset(resultTidx));
int count = IntegerSerializerDeserializer.getInt(resultTuple.getFieldData(0),
resultTuple.getFieldStart(resultTuple.getFieldCount() - 1));
newSearchResult.append(resultTuple, count);
resultTidx++;
if (resultTidx >= resultFrameTupleAcc.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevSearchResult.getBuffers().get(prevBufIdx);
resultFrameTupleAcc.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
}
public SearchResult createSearchResult() {
return new SearchResult(prevSearchResult);
}
public void reset() {
prevSearchResult.clear();
newSearchResult.clear();
}
}