blob: c0cdd1129619dc3d46ef9d412203b3823af51fed [file] [log] [blame]
package edu.uci.ics.asterix.file;
import java.util.List;
import edu.uci.ics.asterix.common.context.AsterixRuntimeComponentsProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.optimizer.rules.am.InvertedIndexAccessMethod;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.asterix.translator.CompiledStatements.CompiledCreateIndexStatement;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.data.ISerializerDeserializerProvider;
import edu.uci.ics.hyracks.algebricks.data.ITypeTraitProvider;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
import edu.uci.ics.hyracks.storage.common.file.TransientLocalResourceFactoryProvider;
public class SecondaryInvertedIndexCreator extends SecondaryIndexCreator {
private IAType secondaryKeyType;
private ITypeTraits[] invListsTypeTraits;
private IBinaryComparatorFactory[] tokenComparatorFactories;
private ITypeTraits[] tokenTypeTraits;
private IBinaryTokenizerFactory tokenizerFactory;
// For tokenization, sorting and loading. Represents <token, primary keys>.
private int numTokenKeyPairFields;
private IBinaryComparatorFactory[] tokenKeyPairComparatorFactories;
private RecordDescriptor tokenKeyPairRecDesc;
protected SecondaryInvertedIndexCreator(PhysicalOptimizationConfig physOptConf) {
super(physOptConf);
}
@Override
@SuppressWarnings("rawtypes")
protected void setSecondaryRecDescAndComparators(CompiledCreateIndexStatement createIndexStmt, AqlMetadataProvider metadata)
throws AlgebricksException, AsterixException {
// Sanity checks.
if (numPrimaryKeys > 1) {
throw new AsterixException("Cannot create inverted index on dataset with composite primary key.");
}
if (numSecondaryKeys > 1) {
throw new AsterixException("Cannot create composite inverted index on multiple fields.");
}
// Prepare record descriptor used in the assign op, and the optional
// select op.
List<String> secondaryKeyFields = createIndexStmt.getKeyFields();
secondaryFieldAccessEvalFactories = new ICopyEvaluatorFactory[numSecondaryKeys];
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys + numSecondaryKeys];
ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numSecondaryKeys + numPrimaryKeys];
ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider();
ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider();
for (int i = 0; i < numSecondaryKeys; i++) {
secondaryFieldAccessEvalFactories[i] = FormatUtils.getDefaultFormat().getFieldAccessEvaluatorFactory(
itemType, secondaryKeyFields.get(i), numPrimaryKeys);
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(i), itemType);
secondaryKeyType = keyTypePair.first;
anySecondaryKeyIsNullable = anySecondaryKeyIsNullable || keyTypePair.second;
ISerializerDeserializer keySerde = serdeProvider.getSerializerDeserializer(secondaryKeyType);
secondaryRecFields[i] = keySerde;
secondaryTypeTraits[i] = typeTraitProvider.getTypeTrait(secondaryKeyType);
}
secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
// Comparators and type traits for tokens.
tokenComparatorFactories = new IBinaryComparatorFactory[numSecondaryKeys];
tokenTypeTraits = new ITypeTraits[numSecondaryKeys];
tokenComparatorFactories[0] = InvertedIndexAccessMethod.getTokenBinaryComparatorFactory(secondaryKeyType);
tokenTypeTraits[0] = InvertedIndexAccessMethod.getTokenTypeTrait(secondaryKeyType);
// Set tokenizer factory.
// TODO: We might want to expose the hashing option at the AQL level,
// and add the choice to the index metadata.
tokenizerFactory = InvertedIndexAccessMethod.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(),
createIndexStmt.getIndexType(), createIndexStmt.getGramLength());
// Type traits for inverted-list elements. Inverted lists contain
// primary keys.
invListsTypeTraits = new ITypeTraits[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
invListsTypeTraits[i] = primaryRecDesc.getTypeTraits()[i];
}
// For tokenization, sorting and loading.
// One token + primary keys.
numTokenKeyPairFields = 1 + numPrimaryKeys;
ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
tokenKeyPairComparatorFactories = new IBinaryComparatorFactory[numTokenKeyPairFields];
tokenKeyPairFields[0] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
tokenKeyPairTypeTraits[0] = tokenTypeTraits[0];
tokenKeyPairComparatorFactories[0] = InvertedIndexAccessMethod
.getTokenBinaryComparatorFactory(secondaryKeyType);
for (int i = 0; i < numPrimaryKeys; i++) {
tokenKeyPairFields[i + 1] = primaryRecDesc.getFields()[i];
tokenKeyPairTypeTraits[i + 1] = primaryRecDesc.getTypeTraits()[i];
tokenKeyPairComparatorFactories[i + 1] = primaryComparatorFactories[i];
}
tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
}
@Override
public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = new JobSpecification();
//TODO replace the transient one to persistent one
ILocalResourceFactoryProvider localResourceFactoryProvider = new TransientLocalResourceFactoryProvider();
LSMInvertedIndexCreateOperatorDescriptor invIndexCreateOp = new LSMInvertedIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, secondaryFileSplitProvider,
AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits, tokenComparatorFactories,
invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexCreateOp,
secondaryPartitionConstraint);
spec.addRoot(invIndexCreateOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
@Override
public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = new JobSpecification();
// Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
// Create primary index scan op.
BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
// Assign op.
AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp, numSecondaryKeys);
// If any of the secondary fields are nullable, then add a select op
// that filters nulls.
AlgebricksMetaOperatorDescriptor selectOp = null;
if (anySecondaryKeyIsNullable) {
selectOp = createFilterNullsSelectOp(spec, numSecondaryKeys);
}
// Create a tokenizer op.
AbstractOperatorDescriptor tokenizerOp = createTokenizerOp(spec);
// Sort by token + primary keys.
ExternalSortOperatorDescriptor sortOp = createSortOp(spec, tokenKeyPairComparatorFactories, tokenKeyPairRecDesc);
// Create secondary inverted index bulk load op.
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = createInvertedIndexBulkLoadOp(spec);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
if (anySecondaryKeyIsNullable) {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, tokenizerOp, 0);
} else {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, tokenizerOp, 0);
}
spec.connect(new OneToOneConnectorDescriptor(spec), tokenizerOp, 0, sortOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sortOp, 0, invIndexBulkLoadOp, 0);
spec.addRoot(invIndexBulkLoadOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
private AbstractOperatorDescriptor createTokenizerOp(JobSpecification spec) throws AlgebricksException {
int docField = 0;
int[] primaryKeyFields = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFields[i] = numSecondaryKeys + i;
}
BinaryTokenizerOperatorDescriptor tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec,
tokenKeyPairRecDesc, tokenizerFactory, docField, primaryKeyFields, false);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, tokenizerOp,
primaryPartitionConstraint);
return tokenizerOp;
}
@Override
protected ExternalSortOperatorDescriptor createSortOp(JobSpecification spec,
IBinaryComparatorFactory[] secondaryComparatorFactories, RecordDescriptor secondaryRecDesc) {
// Sort on token and primary keys.
int[] sortFields = new int[numTokenKeyPairFields];
for (int i = 0; i < numTokenKeyPairFields; i++) {
sortFields[i] = i;
}
ExternalSortOperatorDescriptor sortOp = new ExternalSortOperatorDescriptor(spec,
physOptConf.getMaxFramesExternalSort(), sortFields, tokenKeyPairComparatorFactories, secondaryRecDesc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sortOp, primaryPartitionConstraint);
return sortOp;
}
private LSMInvertedIndexBulkLoadOperatorDescriptor createInvertedIndexBulkLoadOp(JobSpecification spec) {
int[] fieldPermutation = new int[numSecondaryKeys + numPrimaryKeys];
for (int i = 0; i < numTokenKeyPairFields; i++) {
fieldPermutation[i] = i;
}
LSMInvertedIndexBulkLoadOperatorDescriptor invIndexBulkLoadOp = new LSMInvertedIndexBulkLoadOperatorDescriptor(
spec, fieldPermutation, false, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER,
secondaryFileSplitProvider, AsterixRuntimeComponentsProvider.NOINDEX_PROVIDER, tokenTypeTraits,
tokenComparatorFactories, invListsTypeTraits, primaryComparatorFactories, tokenizerFactory,
new LSMInvertedIndexDataflowHelperFactory(AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER,
AsterixRuntimeComponentsProvider.LSMINVERTEDINDEX_PROVIDER),
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, invIndexBulkLoadOp,
secondaryPartitionConstraint);
return invIndexBulkLoadOp;
}
}