blob: ceaff1e36aec38798ae24dfff1c759beb70a0199 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.invertedindex.dataflow;
import java.io.DataOutput;
import java.nio.ByteBuffer;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import edu.uci.ics.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import edu.uci.ics.hyracks.dataflow.common.comm.util.FrameUtils;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDataflowHelper;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexResultCursor;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifier;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.InvertedIndex;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.OccurrenceThresholdPanicException;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.SearchResultCursor;
import edu.uci.ics.hyracks.storage.am.invertedindex.impls.TOccurrenceSearcher;
import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizer;
public class InvertedIndexSearchOperatorNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
private final TreeIndexDataflowHelper btreeDataflowHelper;
private final InvertedIndexDataflowHelper invIndexDataflowHelper;
private final IHyracksTaskContext ctx;
private final int queryField;
private FrameTupleAccessor accessor;
private FrameTupleReference tuple;
private IRecordDescriptorProvider recordDescProvider;
private InvertedIndex invIndex;
private final IInvertedIndexSearchModifier searchModifier;
private final IBinaryTokenizer queryTokenizer;
private TOccurrenceSearcher searcher;
private IInvertedIndexResultCursor resultCursor;
private ByteBuffer writeBuffer;
private FrameTupleAppender appender;
private ArrayTupleBuilder tb;
private DataOutput dos;
private final AbstractInvertedIndexOperatorDescriptor opDesc;
public InvertedIndexSearchOperatorNodePushable(AbstractInvertedIndexOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int queryField, IInvertedIndexSearchModifier searchModifier,
IBinaryTokenizer queryTokenizer, IRecordDescriptorProvider recordDescProvider) {
this.opDesc = opDesc;
btreeDataflowHelper = (TreeIndexDataflowHelper) opDesc.getIndexDataflowHelperFactory()
.createIndexDataflowHelper(opDesc, ctx, partition, false);
invIndexDataflowHelper = new InvertedIndexDataflowHelper(btreeDataflowHelper, opDesc, ctx, partition, false);
this.ctx = ctx;
this.queryField = queryField;
this.searchModifier = searchModifier;
this.queryTokenizer = queryTokenizer;
this.recordDescProvider = recordDescProvider;
}
@Override
public void open() throws HyracksDataException {
RecordDescriptor recDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getOperatorId(), 0);
accessor = new FrameTupleAccessor(btreeDataflowHelper.getHyracksTaskContext().getFrameSize(), recDesc);
tuple = new FrameTupleReference();
// BTree.
try {
btreeDataflowHelper.init();
} catch (Exception e) {
// Cleanup in case of failure/
btreeDataflowHelper.deinit();
if (e instanceof HyracksDataException) {
throw (HyracksDataException) e;
} else {
throw new HyracksDataException(e);
}
}
// Inverted Index.
try {
invIndexDataflowHelper.init();
invIndex = (InvertedIndex) invIndexDataflowHelper.getIndex();
} catch (Exception e) {
// Cleanup in case of failure.
invIndexDataflowHelper.deinit();
if (e instanceof HyracksDataException) {
throw (HyracksDataException) e;
} else {
throw new HyracksDataException(e);
}
}
writeBuffer = btreeDataflowHelper.getHyracksTaskContext().allocateFrame();
tb = new ArrayTupleBuilder(opDesc.getInvListsTypeTraits().length);
dos = tb.getDataOutput();
appender = new FrameTupleAppender(btreeDataflowHelper.getHyracksTaskContext().getFrameSize());
appender.reset(writeBuffer, true);
searcher = new TOccurrenceSearcher(ctx, invIndex, queryTokenizer);
resultCursor = new SearchResultCursor(searcher.createResultFrameTupleAccessor(),
searcher.createResultTupleReference());
writer.open();
}
private void writeSearchResults() throws Exception {
while (resultCursor.hasNext()) {
resultCursor.next();
tb.reset();
ITupleReference invListElement = resultCursor.getTuple();
int invListFields = opDesc.getInvListsTypeTraits().length;
for (int i = 0; i < invListFields; i++) {
dos.write(invListElement.getFieldData(i), invListElement.getFieldStart(i),
invListElement.getFieldLength(i));
tb.addFieldEndOffset();
}
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
FrameUtils.flushFrame(writeBuffer, writer);
appender.reset(writeBuffer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
}
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
try {
for (int i = 0; i < tupleCount; i++) {
tuple.reset(accessor, i);
searcher.reset();
try {
searcher.reset();
searcher.search(resultCursor, tuple, queryField, searchModifier);
writeSearchResults();
} catch (OccurrenceThresholdPanicException e) {
// Ignore panic cases for now.
}
}
} catch (Exception e) {
throw new HyracksDataException(e);
}
}
@Override
public void fail() throws HyracksDataException {
writer.fail();
}
@Override
public void close() throws HyracksDataException {
try {
if (appender.getTupleCount() > 0) {
FrameUtils.flushFrame(writeBuffer, writer);
}
writer.close();
} finally {
try {
btreeDataflowHelper.deinit();
} finally {
invIndexDataflowHelper.deinit();
}
}
}
}