blob: 9732c6c4ff24780d5fba2fbc2e322a1c705c9c7c [file] [log] [blame]
package edu.uci.ics.asterix.algebra.operators.physical;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import edu.uci.ics.hyracks.data.std.primitive.ShortPointable;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifierFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.PartitionedLSMInvertedIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
/**
* Contributes the runtime operator for an unnest-map representing an
* inverted-index search.
*/
public class InvertedIndexPOperator extends IndexSearchPOperator {
private final boolean isPartitioned;
public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast,
boolean isPartitioned) {
super(idx, requiresBroadcast);
this.isPartitioned = isPartitioned;
}
@Override
public PhysicalOperatorTag getOperatorTag() {
if (isPartitioned) {
return PhysicalOperatorTag.FUZZY_INVERTED_INDEX_SEARCH;
} else {
return PhysicalOperatorTag.INVERTED_INDEX_SEARCH;
}
}
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestMapOperator unnestMapOp = (UnnestMapOperator) op;
ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
throw new IllegalStateException();
}
AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
if (unnestFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.INDEX_SEARCH) {
return;
}
InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams();
jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
Dataset dataset;
try {
dataset = MetadataManager.INSTANCE.getDataset(metadataProvider.getMetadataTxnContext(),
jobGenParams.getDataverseName(), jobGenParams.getDatasetName());
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
// Build runtime.
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch = buildInvertedIndexRuntime(
metadataProvider, context, builder.getJobSpec(), unnestMapOp, opSchema, jobGenParams.getRetainInput(),
jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), jobGenParams.getSearchKeyType(),
keyIndexes, jobGenParams.getSearchModifierType(), jobGenParams.getSimilarityThreshold());
// Contribute operator in hyracks job.
builder.contributeHyracksOperator(unnestMapOp, invIndexSearch.first);
builder.contributeAlgebricksPartitionConstraint(invIndexSearch.first, invIndexSearch.second);
ILogicalOperator srcExchange = unnestMapOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(srcExchange, 0, unnestMapOp, 0);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedIndexRuntime(
AqlMetadataProvider metadataProvider, JobGenContext context, JobSpecification jobSpec,
UnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput, String datasetName,
Dataset dataset, String indexName, ATypeTag searchKeyType, int[] keyFields,
SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold)
throws AlgebricksException {
try {
IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
IAType itemType = MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(),
dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype();
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(metadataProvider.getMetadataTxnContext(),
dataset.getDataverseName(), dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ datasetName);
}
List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
throw new AlgebricksException(
"Cannot use "
+ numSecondaryKeys
+ " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
}
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recordType = (ARecordType) itemType;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recordType);
IAType secondaryKeyType = keyPairType.first;
if (secondaryKeyType == null) {
throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
}
// TODO: For now we assume the type of the generated tokens is the
// same as the indexed field.
// We need a better way of expressing this because tokens may be
// hashed, or an inverted-index may index a list type, etc.
int numTokenKeys = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenKeys];
IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenKeys];
for (int i = 0; i < numSecondaryKeys; i++) {
tokenComparatorFactories[i] = InvertedIndexAccessMethod
.getTokenBinaryComparatorFactory(secondaryKeyType);
tokenTypeTraits[i] = InvertedIndexAccessMethod.getTokenTypeTrait(secondaryKeyType);
}
if (isPartitioned) {
// The partitioning field is hardcoded to be a short *without* an Asterix type tag.
tokenComparatorFactories[numSecondaryKeys] = PointableBinaryComparatorFactory
.of(ShortPointable.FACTORY);
tokenTypeTraits[numSecondaryKeys] = ShortPointable.TYPE_TRAITS;
}
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
List<LogicalVariable> outputVars = unnestMap.getVariables();
if (retainInput) {
outputVars = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(unnestMap, outputVars);
}
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
int start = outputRecDesc.getFieldCount() - numPrimaryKeys;
IBinaryComparatorFactory[] invListsComparatorFactories = JobGenHelper
.variablesToAscBinaryComparatorFactories(outputVars, start, numPrimaryKeys, typeEnv, context);
ITypeTraits[] invListsTypeTraits = JobGenHelper.variablesToTypeTraits(outputVars, start, numPrimaryKeys,
typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadataProvider
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
datasetName, indexName);
// TODO: Here we assume there is only one search key field.
int queryField = keyFields[0];
// Get tokenizer and search modifier factories.
IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod
.getSearchModifierFactory(searchModifierType, simThresh, secondaryIndex);
IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
searchModifierType, searchKeyType, secondaryIndex);
IIndexDataflowHelperFactory dataflowHelperFactory;
if (!isPartitioned) {
dataflowHelperFactory = new LSMInvertedIndexDataflowHelperFactory(
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
} else {
dataflowHelperFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER);
}
LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new LSMInvertedIndexSearchOperatorDescriptor(
jobSpec, queryField, appContext.getStorageManagerInterface(), secondarySplitsAndConstraint.first,
appContext.getIndexLifecycleManagerProvider(), tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, invListsComparatorFactories, dataflowHelperFactory, queryTokenizerFactory,
searchModifierFactory, outputRecDesc, retainInput, NoOpOperationCallbackFactory.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
secondarySplitsAndConstraint.second);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
}