blob: a75a75d8e05f5250fca71532a50cb2673b70d6e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.metadata.utils;
import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.context.AsterixVirtualBufferCacheProvider;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.formats.nontagged.NullIntrospector;
import org.apache.asterix.metadata.api.IResourceFactoryProvider;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.data.ITypeTraitProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.data.std.accessors.ShortBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManagerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMPageWriteCallbackFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.LSMInvertedIndexLocalResourceFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluatorFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.IStorageManager;
public class InvertedIndexResourceFactoryProvider implements IResourceFactoryProvider {
public static final InvertedIndexResourceFactoryProvider INSTANCE = new InvertedIndexResourceFactoryProvider();
private InvertedIndexResourceFactoryProvider() {
}
@Override
public IResourceFactory getResourceFactory(MetadataProvider mdProvider, Dataset dataset, Index index,
ARecordType recordType, ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
Map<String, String> mergePolicyProperties, ITypeTraits[] filterTypeTraits,
IBinaryComparatorFactory[] filterCmpFactories) throws AlgebricksException {
// Get basic info
List<List<String>> primaryKeys = dataset.getPrimaryKeys();
Index.TextIndexDetails indexDetails = (Index.TextIndexDetails) index.getIndexDetails();
List<List<String>> secondaryKeys = indexDetails.getKeyFieldNames();
List<String> filterFieldName = DatasetUtil.getFilterField(dataset);
int numPrimaryKeys = primaryKeys.size();
int numSecondaryKeys = secondaryKeys.size();
// Validate
if (dataset.getDatasetType() != DatasetType.INTERNAL) {
throw new CompilationException(ErrorCode.COMPILATION_INDEX_TYPE_NOT_SUPPORTED_FOR_DATASET_TYPE,
index.getIndexType().name(), dataset.getDatasetType());
}
if (numPrimaryKeys > 1) {
throw new AsterixException(
"Cannot create inverted index on " + dataset(PLURAL) + " with composite primary key.");
}
if (numSecondaryKeys > 1) {
throw new AsterixException("Cannot create composite inverted index on multiple fields.");
}
boolean isPartitioned = index.getIndexType() == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| index.getIndexType() == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
int numTokenKeyPairFields = (!isPartitioned) ? 1 + numPrimaryKeys : 2 + numPrimaryKeys;
int[] invertedIndexFields = null;
int[] secondaryFilterFieldsForNonBulkLoadOps = null;
int[] invertedIndexFieldsForNonBulkLoadOps = null;
int[] secondaryFilterFields = null;
if (filterFieldName != null) {
invertedIndexFields = new int[numTokenKeyPairFields];
for (int i = 0; i < invertedIndexFields.length; i++) {
invertedIndexFields[i] = i;
}
secondaryFilterFieldsForNonBulkLoadOps = new int[filterFieldName.size()];
secondaryFilterFieldsForNonBulkLoadOps[0] = numSecondaryKeys + numPrimaryKeys;
invertedIndexFieldsForNonBulkLoadOps = new int[numSecondaryKeys + numPrimaryKeys];
for (int i = 0; i < invertedIndexFieldsForNonBulkLoadOps.length; i++) {
invertedIndexFieldsForNonBulkLoadOps[i] = i;
}
secondaryFilterFields = new int[filterFieldName.size()];
secondaryFilterFields[0] = numTokenKeyPairFields - numPrimaryKeys + numPrimaryKeys;
}
IStorageComponentProvider storageComponentProvider = mdProvider.getStorageComponentProvider();
IStorageManager storageManager = storageComponentProvider.getStorageManager();
ILSMOperationTrackerFactory opTrackerFactory = dataset.getIndexOperationTrackerFactory(index);
ILSMIOOperationCallbackFactory ioOpCallbackFactory = dataset.getIoOperationCallbackFactory(index);
ILSMPageWriteCallbackFactory pageWriteCallbackFactory = dataset.getPageWriteCallbackFactory();
IMetadataPageManagerFactory metadataPageManagerFactory =
storageComponentProvider.getMetadataPageManagerFactory();
AsterixVirtualBufferCacheProvider vbcProvider = new AsterixVirtualBufferCacheProvider(dataset.getDatasetId());
ILSMIOOperationSchedulerProvider ioSchedulerProvider =
storageComponentProvider.getIoOperationSchedulerProvider();
double bloomFilterFalsePositiveRate = mdProvider.getStorageProperties().getBloomFilterFalsePositiveRate();
ITypeTraits[] typeTraits = getInvListTypeTraits(mdProvider, dataset, recordType, metaType);
IBinaryComparatorFactory[] cmpFactories =
getInvListComparatorFactories(mdProvider, dataset, recordType, metaType);
ITypeTraits[] tokenTypeTraits = getTokenTypeTraits(dataset, index, recordType, metaType);
IBinaryComparatorFactory[] tokenCmpFactories =
getTokenComparatorFactories(dataset, index, recordType, metaType);
IBinaryTokenizerFactory tokenizerFactory = getTokenizerFactory(dataset, index, recordType, metaType);
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory =
FullTextUtil.fetchFilterAndCreateConfigEvaluator(mdProvider, index.getDataverseName(),
indexDetails.getFullTextConfigName());
ITypeTraitProvider typeTraitProvider = mdProvider.getDataFormat().getTypeTraitProvider();
return new LSMInvertedIndexLocalResourceFactory(storageManager, typeTraits, cmpFactories, filterTypeTraits,
filterCmpFactories, secondaryFilterFields, opTrackerFactory, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, true, tokenTypeTraits, tokenCmpFactories, tokenizerFactory,
fullTextConfigEvaluatorFactory, isPartitioned, invertedIndexFields,
secondaryFilterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps,
bloomFilterFalsePositiveRate, typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE);
}
// Returns an array of the type traits of the inverted list elements
// It contains the primary key(s) type traits of the corresponding indexed rows,
// and those primary keys are the elements in the inverted list.
private static ITypeTraits[] getInvListTypeTraits(MetadataProvider metadataProvider, Dataset dataset,
ARecordType recordType, ARecordType metaType) throws AlgebricksException {
ITypeTraits[] primaryTypeTraits = dataset.getPrimaryTypeTraits(metadataProvider, recordType, metaType);
ITypeTraits[] typeTraits = new ITypeTraits[primaryTypeTraits.length - 1];
for (int i = 0; i < typeTraits.length; i++) {
typeTraits[i] = primaryTypeTraits[i];
}
return typeTraits;
}
private static IBinaryComparatorFactory[] getInvListComparatorFactories(MetadataProvider metadataProvider,
Dataset dataset, ARecordType recordType, ARecordType metaType) throws AlgebricksException {
return dataset.getPrimaryComparatorFactories(metadataProvider, recordType, metaType);
}
private static ITypeTraits[] getTokenTypeTraits(Dataset dataset, Index index, ARecordType recordType,
ARecordType metaType) throws AlgebricksException {
int numPrimaryKeys = dataset.getPrimaryKeys().size();
Index.TextIndexDetails indexDetails = (Index.TextIndexDetails) index.getIndexDetails();
int numSecondaryKeys = indexDetails.getKeyFieldNames().size();
IndexType indexType = index.getIndexType();
// Sanity checks.
if (numPrimaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
indexType, DatasetUtil.getFullyQualifiedDisplayName(dataset));
}
if (numSecondaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys,
indexType, 1);
}
boolean isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
ARecordType sourceType;
List<Integer> keySourceIndicators = indexDetails.getKeyFieldSourceIndicators();
if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
sourceType = recordType;
} else {
sourceType = metaType;
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index,
indexDetails.getKeyFieldTypes().get(0), indexDetails.getKeyFieldNames().get(0), sourceType);
IAType secondaryKeyType = keyTypePair.first;
int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
if (isPartitioned) {
// The partitioning field is hardcoded to be a short *without* an Asterix type tag.
tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
}
return tokenTypeTraits;
}
private static IBinaryComparatorFactory[] getTokenComparatorFactories(Dataset dataset, Index index,
ARecordType recordType, ARecordType metaType) throws AlgebricksException {
int numPrimaryKeys = dataset.getPrimaryKeys().size();
Index.TextIndexDetails indexDetails = (Index.TextIndexDetails) index.getIndexDetails();
int numSecondaryKeys = indexDetails.getKeyFieldNames().size();
IndexType indexType = index.getIndexType();
// Sanity checks.
if (numPrimaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
indexType, DatasetUtil.getFullyQualifiedDisplayName(dataset));
}
if (numSecondaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys,
indexType, 1);
}
boolean isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
List<Integer> keySourceIndicators = indexDetails.getKeyFieldSourceIndicators();
ARecordType sourceType;
if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
sourceType = recordType;
} else {
sourceType = metaType;
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index,
indexDetails.getKeyFieldTypes().get(0), indexDetails.getKeyFieldNames().get(0), sourceType);
IAType secondaryKeyType = keyTypePair.first;
// Comparators and type traits for tokens.
int numTokenFields = (!isPartitioned) ? numSecondaryKeys : numSecondaryKeys + 1;
IBinaryComparatorFactory[] tokenComparatorFactories = new IBinaryComparatorFactory[numTokenFields];
tokenComparatorFactories[0] = NonTaggedFormatUtil.getTokenBinaryComparatorFactory(secondaryKeyType);
if (isPartitioned) {
// The partitioning field is hardcoded to be a short *without* an Asterix type tag.
tokenComparatorFactories[1] = ShortBinaryComparatorFactory.INSTANCE;
}
return tokenComparatorFactories;
}
private static IBinaryTokenizerFactory getTokenizerFactory(Dataset dataset, Index index, ARecordType recordType,
ARecordType metaType) throws AlgebricksException {
int numPrimaryKeys = dataset.getPrimaryKeys().size();
Index.TextIndexDetails indexDetails = (Index.TextIndexDetails) index.getIndexDetails();
int numSecondaryKeys = indexDetails.getKeyFieldNames().size();
IndexType indexType = index.getIndexType();
// Sanity checks.
if (numPrimaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_FOR_DATASET_WITH_COMPOSITE_PRIMARY_INDEX,
indexType, DatasetUtil.getFullyQualifiedDisplayName(dataset));
}
if (numSecondaryKeys > 1) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_INDEX_NUM_OF_FIELD, numSecondaryKeys,
indexType, 1);
}
ARecordType sourceType;
List<Integer> keySourceIndicators = indexDetails.getKeyFieldSourceIndicators();
if (keySourceIndicators == null || keySourceIndicators.get(0) == 0) {
sourceType = recordType;
} else {
sourceType = metaType;
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(index,
indexDetails.getKeyFieldTypes().get(0), indexDetails.getKeyFieldNames().get(0), sourceType);
IAType secondaryKeyType = keyTypePair.first;
// Set tokenizer factory.
// TODO: We might want to expose the hashing option at the AQL level,
// and add the choice to the index metadata.
return NonTaggedFormatUtil.getBinaryTokenizerFactory(secondaryKeyType.getTypeTag(), indexType,
indexDetails.getGramLength());
}
}