blob: 72e818eab9c66cb445e0597d834a181b24c3a43e [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.file;
import java.util.List;
import edu.uci.ics.asterix.common.api.ILocalResourceMetadata;
import edu.uci.ics.asterix.common.config.AsterixStorageProperties;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.config.IAsterixPropertiesProvider;
import edu.uci.ics.asterix.common.context.AsterixVirtualBufferCacheProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.ioopcallbacks.LSMRTreeIOOperationCallbackFactory;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import edu.uci.ics.asterix.transaction.management.resource.LSMRTreeLocalResourceMetadata;
import edu.uci.ics.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider;
import edu.uci.ics.asterix.transaction.management.service.transaction.AsterixRuntimeComponentsProvider;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.impls.BTree;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexCreateOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.lsm.rtree.dataflow.LSMRTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.frames.RTreePolicyType;
import edu.uci.ics.hyracks.storage.common.file.ILocalResourceFactoryProvider;
import edu.uci.ics.hyracks.storage.common.file.LocalResource;
@SuppressWarnings("rawtypes")
public class SecondaryRTreeOperationsHelper extends SecondaryIndexOperationsHelper {
protected IPrimitiveValueProviderFactory[] valueProviderFactories;
protected int numNestedSecondaryKeyFields;
protected ATypeTag keyType;
protected SecondaryRTreeOperationsHelper(PhysicalOptimizationConfig physOptConf,
IAsterixPropertiesProvider propertiesProvider) {
super(physOptConf, propertiesProvider);
}
@Override
public JobSpecification buildCreationJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
//prepare a LocalResourceMetadata which will be stored in NC's local resource repository
ILocalResourceMetadata localResourceMetadata = new LSMRTreeLocalResourceMetadata(
secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, primaryComparatorFactories,
valueProviderFactories, RTreePolicyType.RTREE, AqlMetadataProvider.proposeLinearizer(keyType,
secondaryComparatorFactories.length), dataset.getDatasetId(), mergePolicyFactory,
mergePolicyFactoryProperties);
ILocalResourceFactoryProvider localResourceFactoryProvider = new PersistentLocalResourceFactoryProvider(
localResourceMetadata, LocalResource.LSMRTreeResource);
TreeIndexCreateOperatorDescriptor secondaryIndexCreateOp = new TreeIndexCreateOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories, null,
new LSMRTreeDataflowHelperFactory(valueProviderFactories, RTreePolicyType.RTREE,
primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()),
mergePolicyFactory, mergePolicyFactoryProperties, new SecondaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMRTreeIOOperationCallbackFactory.INSTANCE, AqlMetadataProvider.proposeLinearizer(keyType,
secondaryComparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate()), localResourceFactoryProvider,
NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryIndexCreateOp,
secondaryPartitionConstraint);
spec.addRoot(secondaryIndexCreateOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
@Override
protected void setSecondaryRecDescAndComparators(IndexType indexType, List<String> secondaryKeyFields,
int gramLength, AqlMetadataProvider metadata) throws AlgebricksException, AsterixException {
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
throw new AsterixException(
"Cannot use "
+ numSecondaryKeys
+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
}
Pair<IAType, Boolean> spatialTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), itemType);
IAType spatialType = spatialTypePair.first;
anySecondaryKeyIsNullable = spatialTypePair.second;
if (spatialType == null) {
throw new AsterixException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
}
int numDimensions = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
numNestedSecondaryKeyFields = numDimensions * 2;
secondaryFieldAccessEvalFactories = metadata.getFormat().createMBRFactory(itemType, secondaryKeyFields.get(0),
numPrimaryKeys, numDimensions);
secondaryComparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
ISerializerDeserializer[] secondaryRecFields = new ISerializerDeserializer[numPrimaryKeys
+ numNestedSecondaryKeyFields];
ITypeTraits[] secondaryTypeTraits = new ITypeTraits[numNestedSecondaryKeyFields + numPrimaryKeys];
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
keyType = nestedKeyType.getTypeTag();
for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(nestedKeyType);
secondaryRecFields[i] = keySerde;
secondaryComparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
nestedKeyType, true);
secondaryTypeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
// Add serializers and comparators for primary index fields.
for (int i = 0; i < numPrimaryKeys; i++) {
secondaryRecFields[numNestedSecondaryKeyFields + i] = primaryRecDesc.getFields()[i];
secondaryTypeTraits[numNestedSecondaryKeyFields + i] = primaryRecDesc.getTypeTraits()[i];
}
secondaryRecDesc = new RecordDescriptor(secondaryRecFields, secondaryTypeTraits);
}
@Override
public JobSpecification buildLoadingJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
// Create dummy key provider for feeding the primary index scan.
AbstractOperatorDescriptor keyProviderOp = createDummyKeyProviderOp(spec);
// Create primary index scan op.
BTreeSearchOperatorDescriptor primaryScanOp = createPrimaryIndexScanOp(spec);
// Assign op.
AlgebricksMetaOperatorDescriptor asterixAssignOp = createAssignOp(spec, primaryScanOp,
numNestedSecondaryKeyFields);
// If any of the secondary fields are nullable, then add a select op that filters nulls.
AlgebricksMetaOperatorDescriptor selectOp = null;
if (anySecondaryKeyIsNullable) {
selectOp = createFilterNullsSelectOp(spec, numNestedSecondaryKeyFields);
}
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
// Create secondary RTree bulk load op.
TreeIndexBulkLoadOperatorDescriptor secondaryBulkLoadOp = createTreeIndexBulkLoadOp(spec,
numNestedSecondaryKeyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), BTree.DEFAULT_FILL_FACTOR);
// Connect the operators.
spec.connect(new OneToOneConnectorDescriptor(spec), keyProviderOp, 0, primaryScanOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), primaryScanOp, 0, asterixAssignOp, 0);
if (anySecondaryKeyIsNullable) {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, selectOp, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), selectOp, 0, secondaryBulkLoadOp, 0);
} else {
spec.connect(new OneToOneConnectorDescriptor(spec), asterixAssignOp, 0, secondaryBulkLoadOp, 0);
}
spec.addRoot(secondaryBulkLoadOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
@Override
public JobSpecification buildCompactJobSpec() throws AsterixException, AlgebricksException {
JobSpecification spec = JobSpecificationUtils.createJobSpecification();
AsterixStorageProperties storageProperties = propertiesProvider.getStorageProperties();
LSMTreeIndexCompactOperatorDescriptor compactOp = new LSMTreeIndexCompactOperatorDescriptor(spec,
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
secondaryFileSplitProvider, secondaryRecDesc.getTypeTraits(), secondaryComparatorFactories,
secondaryBloomFilterKeyFields, new LSMRTreeDataflowHelperFactory(valueProviderFactories,
RTreePolicyType.RTREE, primaryComparatorFactories, new AsterixVirtualBufferCacheProvider(
dataset.getDatasetId()), mergePolicyFactory, mergePolicyFactoryProperties,
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
AqlMetadataProvider.proposeLinearizer(keyType, secondaryComparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate()), NoOpOperationCallbackFactory.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
secondaryPartitionConstraint);
spec.addRoot(compactOp);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
return spec;
}
}