blob: 6ad5a9c06bbe98e5b0ccaa33d5832c363ee60752 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.invertedindex.impls;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.hyracks.api.context.IHyracksStageletContext;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeCursor;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeInteriorFrame;
import edu.uci.ics.hyracks.storage.am.btree.api.IBTreeLeafFrame;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOp;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTreeOpContext;
import edu.uci.ics.hyracks.storage.am.btree.impls.MultiComparator;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangeSearchCursor;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IBinaryTokenizer;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexResultCursor;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearcher;
public class SimpleConjunctiveSearcher implements IInvertedIndexSearcher {
private final int numKeyFields;
private final int numValueFields;
private final IBinaryComparator[] keyCmps;
private final IBinaryComparator[] valueCmps;
private final BTree btree;
private final IHyracksStageletContext ctx;
private final ArrayTupleBuilder resultTupleBuilder;
private final FrameTupleAppender resultTupleAppender;
private final FrameTupleAccessor resultFrameAccessor;
private List<ByteBuffer> newResultBuffers = new ArrayList<ByteBuffer>();
private List<ByteBuffer> prevResultBuffers = new ArrayList<ByteBuffer>();
private List<ByteBuffer> swap = null;
private final ListResultCursor resultCursor = new ListResultCursor();
private int maxResultBufIdx = 0;
private final IBTreeLeafFrame leafFrame;
private final IBTreeInteriorFrame interiorFrame;
private final IBTreeCursor btreeCursor;
private final FrameTupleReference searchKey = new FrameTupleReference();
private final RangePredicate pred = new RangePredicate(true, null, null, true, true, null, null);
private final IBinaryTokenizer queryTokenizer;
public SimpleConjunctiveSearcher(IHyracksStageletContext ctx, BTree btree, RecordDescriptor btreeRecDesc,
IBinaryTokenizer queryTokenizer, int numKeyFields, int numValueFields) {
this.ctx = ctx;
this.btree = btree;
this.queryTokenizer = queryTokenizer;
this.numKeyFields = numKeyFields;
this.numValueFields = numValueFields;
leafFrame = btree.getLeafFrameFactory().getFrame();
interiorFrame = btree.getInteriorFrameFactory().getFrame();
btreeCursor = new RangeSearchCursor(leafFrame);
resultTupleAppender = new FrameTupleAppender(ctx.getFrameSize());
resultTupleBuilder = new ArrayTupleBuilder(numValueFields);
newResultBuffers.add(ctx.allocateFrame());
prevResultBuffers.add(ctx.allocateFrame());
MultiComparator btreeCmp = btree.getMultiComparator();
keyCmps = new IBinaryComparator[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
keyCmps[i] = btreeCmp.getComparators()[i];
}
valueCmps = new IBinaryComparator[numValueFields];
for (int i = 0; i < numValueFields; i++) {
valueCmps[i] = btreeCmp.getComparators()[numKeyFields + i];
}
MultiComparator searchCmp = new MultiComparator(btreeCmp.getTypeTraits(), keyCmps);
pred.setLowKeyComparator(searchCmp);
pred.setHighKeyComparator(searchCmp);
pred.setLowKey(searchKey, true);
pred.setHighKey(searchKey, true);
ISerializerDeserializer[] valueSerde = new ISerializerDeserializer[numValueFields];
for (int i = 0; i < numValueFields; i++) {
valueSerde[i] = btreeRecDesc.getFields()[numKeyFields + i];
}
RecordDescriptor valueRecDesc = new RecordDescriptor(valueSerde);
resultFrameAccessor = new FrameTupleAccessor(ctx.getFrameSize(), valueRecDesc);
}
public void search(ITupleReference queryTuple, int queryFieldIndex) throws Exception {
// parse query, TODO: this parsing is too simple
RecordDescriptor queryTokenRecDesc = new RecordDescriptor(
new ISerializerDeserializer[] { UTF8StringSerializerDeserializer.INSTANCE });
ArrayTupleBuilder queryTokenBuilder = new ArrayTupleBuilder(queryTokenRecDesc.getFields().length);
DataOutput queryTokenDos = queryTokenBuilder.getDataOutput();
FrameTupleAppender queryTokenAppender = new FrameTupleAppender(ctx.getFrameSize());
ByteBuffer queryTokenFrame = ctx.allocateFrame();
queryTokenAppender.reset(queryTokenFrame, true);
queryTokenizer.reset(queryTuple.getFieldData(queryFieldIndex), queryTuple.getFieldStart(queryFieldIndex),
queryTuple.getFieldLength(queryFieldIndex));
while (queryTokenizer.hasNext()) {
queryTokenizer.next();
queryTokenBuilder.reset();
try {
queryTokenizer.writeToken(queryTokenDos);
queryTokenBuilder.addFieldEndOffset();
} catch (IOException e) {
throw new HyracksDataException(e);
}
// WARNING: assuming one frame is enough to hold all tokens
queryTokenAppender.append(queryTokenBuilder.getFieldEndOffsets(), queryTokenBuilder.getByteArray(), 0,
queryTokenBuilder.getSize());
}
FrameTupleAccessor queryTokenAccessor = new FrameTupleAccessor(ctx.getFrameSize(), queryTokenRecDesc);
queryTokenAccessor.reset(queryTokenFrame);
int numQueryTokens = queryTokenAccessor.getTupleCount();
maxResultBufIdx = 0;
BTreeOpContext opCtx = btree.createOpContext(BTreeOp.BTO_SEARCH, leafFrame, interiorFrame, null);
resultTupleAppender.reset(newResultBuffers.get(0), true);
try {
// append first inverted list to temporary results
searchKey.reset(queryTokenAccessor, 0);
btree.search(btreeCursor, pred, opCtx);
while (btreeCursor.hasNext()) {
btreeCursor.next();
maxResultBufIdx = appendTupleToNewResults(btreeCursor, maxResultBufIdx);
}
btreeCursor.close();
btreeCursor.reset();
} catch (Exception e) {
throw new HyracksDataException(e);
}
resultFrameAccessor.reset(newResultBuffers.get(0));
// intersect temporary results with remaining inverted lists
for (int i = 1; i < numQueryTokens; i++) {
swap = prevResultBuffers;
prevResultBuffers = newResultBuffers;
newResultBuffers = swap;
try {
searchKey.reset(queryTokenAccessor, i);
btree.search(btreeCursor, pred, opCtx);
maxResultBufIdx = intersectList(btreeCursor, prevResultBuffers, maxResultBufIdx, newResultBuffers);
} catch (Exception e) {
throw new HyracksDataException(e);
}
btreeCursor.close();
btreeCursor.reset();
}
}
private int appendTupleToNewResults(IBTreeCursor btreeCursor, int newBufIdx) throws IOException {
ByteBuffer newCurrentBuffer = newResultBuffers.get(newBufIdx);
ITupleReference tuple = btreeCursor.getTuple();
resultTupleBuilder.reset();
DataOutput dos = resultTupleBuilder.getDataOutput();
for (int i = 0; i < numValueFields; i++) {
int fIdx = numKeyFields + i;
dos.write(tuple.getFieldData(fIdx), tuple.getFieldStart(fIdx), tuple.getFieldLength(fIdx));
resultTupleBuilder.addFieldEndOffset();
}
if (!resultTupleAppender.append(resultTupleBuilder.getFieldEndOffsets(), resultTupleBuilder.getByteArray(), 0,
resultTupleBuilder.getSize())) {
newBufIdx++;
if (newBufIdx >= newResultBuffers.size()) {
newResultBuffers.add(ctx.allocateFrame());
}
newCurrentBuffer = newResultBuffers.get(newBufIdx);
resultTupleAppender.reset(newCurrentBuffer, true);
if (!resultTupleAppender.append(resultTupleBuilder.getFieldEndOffsets(), resultTupleBuilder.getByteArray(),
0, resultTupleBuilder.getSize())) {
throw new IllegalStateException();
}
}
return newBufIdx;
}
private int intersectList(IBTreeCursor btreeCursor, List<ByteBuffer> prevResultBuffers, int maxPrevBufIdx,
List<ByteBuffer> newResultBuffers) throws IOException, Exception {
int newBufIdx = 0;
ByteBuffer newCurrentBuffer = newResultBuffers.get(0);
int prevBufIdx = 0;
ByteBuffer prevCurrentBuffer = prevResultBuffers.get(0);
resultTupleBuilder.reset();
resultTupleAppender.reset(newCurrentBuffer, true);
resultFrameAccessor.reset(prevCurrentBuffer);
// WARNING: not very efficient but good enough for the first cut
boolean advanceCursor = true;
boolean advancePrevResult = false;
int resultTidx = 0;
while ((!advanceCursor || btreeCursor.hasNext()) && prevBufIdx <= maxPrevBufIdx
&& resultTidx < resultFrameAccessor.getTupleCount()) {
if (advanceCursor)
btreeCursor.next();
ITupleReference tuple = btreeCursor.getTuple();
int cmp = 0;
for (int i = 0; i < valueCmps.length; i++) {
int tupleFidx = numKeyFields + i;
cmp = valueCmps[i].compare(tuple.getFieldData(tupleFidx), tuple.getFieldStart(tupleFidx),
tuple.getFieldLength(tupleFidx), resultFrameAccessor.getBuffer().array(),
resultFrameAccessor.getTupleStartOffset(resultTidx) + resultFrameAccessor.getFieldSlotsLength()
+ resultFrameAccessor.getFieldStartOffset(resultTidx, i),
resultFrameAccessor.getFieldLength(resultTidx, i));
if (cmp != 0)
break;
}
// match found
if (cmp == 0) {
newBufIdx = appendTupleToNewResults(btreeCursor, newBufIdx);
advanceCursor = true;
advancePrevResult = true;
} else {
if (cmp < 0) {
advanceCursor = true;
advancePrevResult = false;
} else {
advanceCursor = false;
advancePrevResult = true;
}
}
if (advancePrevResult) {
resultTidx++;
if (resultTidx >= resultFrameAccessor.getTupleCount()) {
prevBufIdx++;
if (prevBufIdx <= maxPrevBufIdx) {
prevCurrentBuffer = prevResultBuffers.get(prevBufIdx);
resultFrameAccessor.reset(prevCurrentBuffer);
resultTidx = 0;
}
}
}
}
return newBufIdx;
}
@Override
public IInvertedIndexResultCursor getResultCursor() {
resultCursor.setResults(newResultBuffers, maxResultBufIdx + 1);
return resultCursor;
}
}