blob: ae2559b85ae9263296d81614c3726c047cff5fb3 [file] [log] [blame]
package edu.uci.ics.asterix.algebra.operators.physical;
import java.util.ArrayList;
import java.util.List;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.declared.AqlSourceId;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.base.IAObject;
import edu.uci.ics.asterix.om.constants.AsterixConstantValue;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod.SearchModifierType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexJobGenParams;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.invertedindex.api.IInvertedIndexSearchModifierFactory;
import edu.uci.ics.hyracks.storage.am.invertedindex.dataflow.InvertedIndexSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.invertedindex.tokenizers.IBinaryTokenizerFactory;
/**
* Contributes the runtime operator for an unnest-map representing an inverted-index search.
*/
public class InvertedIndexPOperator extends IndexSearchPOperator {
public InvertedIndexPOperator(IDataSourceIndex<String, AqlSourceId> idx, boolean requiresBroadcast) {
super(idx, requiresBroadcast);
}
@Override
public PhysicalOperatorTag getOperatorTag() {
return PhysicalOperatorTag.INVERTED_INDEX_SEARCH;
}
@Override
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
UnnestMapOperator unnestMapOp = (UnnestMapOperator) op;
ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
if (unnestExpr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
throw new IllegalStateException();
}
AbstractFunctionCallExpression unnestFuncExpr = (AbstractFunctionCallExpression) unnestExpr;
if (unnestFuncExpr.getFunctionIdentifier() != AsterixBuiltinFunctions.INDEX_SEARCH) {
return;
}
InvertedIndexJobGenParams jobGenParams = new InvertedIndexJobGenParams();
jobGenParams.readFromFuncArgs(unnestFuncExpr.getArguments());
AqlMetadataProvider metadataProvider = (AqlMetadataProvider) context.getMetadataProvider();
AqlCompiledMetadataDeclarations metadata = metadataProvider.getMetadataDeclarations();
Dataset dataset = metadata.findDataset(jobGenParams.getDatasetName());
int[] keyIndexes = getKeyIndexes(jobGenParams.getKeyVarList(), inputSchemas);
// Build runtime.
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> invIndexSearch = buildInvertedIndexRuntime(metadata,
context, builder.getJobSpec(), unnestMapOp, opSchema, jobGenParams.getRetainInput(),
jobGenParams.getDatasetName(), dataset, jobGenParams.getIndexName(), jobGenParams.getSearchKeyType(),
keyIndexes, jobGenParams.getSearchModifierType(), jobGenParams.getSimilarityThreshold());
// Contribute operator in hyracks job.
builder.contributeHyracksOperator(unnestMapOp, invIndexSearch.first);
builder.contributeAlgebricksPartitionConstraint(invIndexSearch.first, invIndexSearch.second);
ILogicalOperator srcExchange = unnestMapOp.getInputs().get(0).getValue();
builder.contributeGraphEdge(srcExchange, 0, unnestMapOp, 0);
}
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInvertedIndexRuntime(
AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
UnnestMapOperator unnestMap, IOperatorSchema opSchema, boolean retainInput, String datasetName,
Dataset dataset, String indexName, ATypeTag searchKeyType, int[] keyFields,
SearchModifierType searchModifierType, IAlgebricksConstantValue similarityThreshold)
throws AlgebricksException {
IAObject simThresh = ((AsterixConstantValue) similarityThreshold).getObject();
String itemTypeName = dataset.getItemTypeName();
IAType itemType = metadata.findType(itemTypeName);
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
Index secondaryIndex = metadata.getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ datasetName);
}
List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
throw new AlgebricksException(
"Cannot use "
+ numSecondaryKeys
+ " fields as a key for an inverted index. There can be only one field as a key for the inverted index index.");
}
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recordType = (ARecordType) itemType;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recordType);
IAType secondaryKeyType = keyPairType.first;
if (secondaryKeyType == null) {
throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
}
// TODO: For now we assume the type of the generated tokens is the same as the indexed field.
// We need a better way of expressing this because tokens may be hashed, or an inverted-index may index a list type, etc.
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numSecondaryKeys];
IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
for (int i = 0; i < numSecondaryKeys; i++) {
tokenComparatorFactories[i] = InvertedIndexAccessMethod.getTokenBinaryComparatorFactory(secondaryKeyType);
tokenTypeTraits[i] = InvertedIndexAccessMethod.getTokenTypeTrait(secondaryKeyType);
}
IVariableTypeEnvironment typeEnv = context.getTypeEnvironment(unnestMap);
List<LogicalVariable> outputVars = unnestMap.getVariables();
if (retainInput) {
outputVars = new ArrayList<LogicalVariable>();
VariableUtilities.getLiveVariables(unnestMap, outputVars);
}
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
int start = outputRecDesc.getFieldCount() - numPrimaryKeys;
IBinaryComparatorFactory[] invListsComparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
outputVars, start, numPrimaryKeys, typeEnv, context);
ITypeTraits[] invListsTypeTraits = JobGenHelper.variablesToTypeTraits(outputVars, start, numPrimaryKeys,
typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> secondarySplitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
Pair<IFileSplitProvider, IFileSplitProvider> fileSplitProviders = metadata
.getInvertedIndexFileSplitProviders(secondarySplitsAndConstraint.first);
// TODO: Here we assume there is only one search key field.
int queryField = keyFields[0];
// Get tokenizer and search modifier factories.
IInvertedIndexSearchModifierFactory searchModifierFactory = InvertedIndexAccessMethod.getSearchModifierFactory(
searchModifierType, simThresh, secondaryIndex);
IBinaryTokenizerFactory queryTokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(
searchModifierType, searchKeyType, secondaryIndex);
InvertedIndexSearchOperatorDescriptor invIndexSearchOp = new InvertedIndexSearchOperatorDescriptor(jobSpec,
queryField, appContext.getStorageManagerInterface(), fileSplitProviders.first,
fileSplitProviders.second, appContext.getIndexRegistryProvider(), tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, invListsComparatorFactories,
new BTreeDataflowHelperFactory(), queryTokenizerFactory, searchModifierFactory, outputRecDesc,
retainInput, NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(invIndexSearchOp,
secondarySplitsAndConstraint.second);
}
}