| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.metadata.declared; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.function.Function; |
| |
| import org.apache.asterix.common.cluster.IClusterStateManager; |
| import org.apache.asterix.common.config.DatasetConfig.DatasetType; |
| import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; |
| import org.apache.asterix.common.config.DatasetConfig.IndexType; |
| import org.apache.asterix.common.config.StorageProperties; |
| import org.apache.asterix.common.context.IStorageComponentProvider; |
| import org.apache.asterix.common.dataflow.ICcApplicationContext; |
| import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor; |
| import org.apache.asterix.common.exceptions.AsterixException; |
| import org.apache.asterix.common.metadata.DataverseName; |
| import org.apache.asterix.common.metadata.LockList; |
| import org.apache.asterix.common.storage.ICompressionManager; |
| import org.apache.asterix.common.transactions.ITxnIdFactory; |
| import org.apache.asterix.common.transactions.TxnId; |
| import org.apache.asterix.common.utils.StorageConstants; |
| import org.apache.asterix.common.utils.StoragePathUtil; |
| import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; |
| import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil; |
| import org.apache.asterix.external.adapter.factory.LookupAdapterFactory; |
| import org.apache.asterix.external.api.IAdapterFactory; |
| import org.apache.asterix.external.api.IDataSourceAdapter; |
| import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; |
| import org.apache.asterix.external.indexing.ExternalFile; |
| import org.apache.asterix.external.indexing.IndexingConstants; |
| import org.apache.asterix.external.operators.ExternalBTreeSearchOperatorDescriptor; |
| import org.apache.asterix.external.operators.ExternalLookupOperatorDescriptor; |
| import org.apache.asterix.external.operators.ExternalRTreeSearchOperatorDescriptor; |
| import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor; |
| import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor; |
| import org.apache.asterix.external.provider.AdapterFactoryProvider; |
| import org.apache.asterix.external.util.ExternalDataConstants; |
| import org.apache.asterix.external.util.FeedConstants; |
| import org.apache.asterix.formats.base.IDataFormat; |
| import org.apache.asterix.formats.nontagged.BinaryBooleanInspector; |
| import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider; |
| import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider; |
| import org.apache.asterix.formats.nontagged.TypeTraitProvider; |
| import org.apache.asterix.metadata.MetadataManager; |
| import org.apache.asterix.metadata.MetadataTransactionContext; |
| import org.apache.asterix.metadata.api.ICCExtensionManager; |
| import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities; |
| import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint; |
| import org.apache.asterix.metadata.entities.Dataset; |
| import org.apache.asterix.metadata.entities.DatasourceAdapter; |
| import org.apache.asterix.metadata.entities.Dataverse; |
| import org.apache.asterix.metadata.entities.ExternalDatasetDetails; |
| import org.apache.asterix.metadata.entities.Feed; |
| import org.apache.asterix.metadata.entities.FeedConnection; |
| import org.apache.asterix.metadata.entities.FeedPolicyEntity; |
| import org.apache.asterix.metadata.entities.Index; |
| import org.apache.asterix.metadata.entities.Synonym; |
| import org.apache.asterix.metadata.feeds.FeedMetadataUtil; |
| import org.apache.asterix.metadata.lock.ExternalDatasetsRegistry; |
| import org.apache.asterix.metadata.utils.DatasetUtil; |
| import org.apache.asterix.metadata.utils.MetadataConstants; |
| import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; |
| import org.apache.asterix.om.functions.BuiltinFunctions; |
| import org.apache.asterix.om.functions.IFunctionExtensionManager; |
| import org.apache.asterix.om.functions.IFunctionManager; |
| import org.apache.asterix.om.types.ARecordType; |
| import org.apache.asterix.om.types.ATypeTag; |
| import org.apache.asterix.om.types.IAType; |
| import org.apache.asterix.om.utils.NonTaggedFormatUtil; |
| import org.apache.asterix.runtime.base.AsterixTupleFilterFactory; |
| import org.apache.asterix.runtime.formats.FormatUtils; |
| import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor; |
| import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; |
| import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor; |
| import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor; |
| import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; |
| import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.algebricks.common.utils.Pair; |
| import org.apache.hyracks.algebricks.common.utils.Triple; |
| import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; |
| import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; |
| import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; |
| import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; |
| import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink; |
| import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource; |
| import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex; |
| import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; |
| import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; |
| import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; |
| import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; |
| import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; |
| import org.apache.hyracks.algebricks.data.IAWriterFactory; |
| import org.apache.hyracks.algebricks.data.IPrinterFactory; |
| import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; |
| import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; |
| import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; |
| import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; |
| import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory; |
| import org.apache.hyracks.api.dataflow.IOperatorDescriptor; |
| import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; |
| import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory; |
| import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory; |
| import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; |
| import org.apache.hyracks.api.dataflow.value.ITypeTraits; |
| import org.apache.hyracks.api.dataflow.value.RecordDescriptor; |
| import org.apache.hyracks.api.io.FileSplit; |
| import org.apache.hyracks.api.job.JobSpecification; |
| import org.apache.hyracks.api.result.IResultMetadata; |
| import org.apache.hyracks.api.result.ResultSetId; |
| import org.apache.hyracks.data.std.primitive.ShortPointable; |
| import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer; |
| import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; |
| import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; |
| import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; |
| import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory; |
| import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; |
| import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; |
| import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; |
| import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor; |
| import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory; |
| import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor; |
| import org.apache.hyracks.storage.common.IStorageManager; |
| |
| public class MetadataProvider implements IMetadataProvider<DataSourceId, String> { |
| |
| private final ICcApplicationContext appCtx; |
| private final IStorageComponentProvider storageComponentProvider; |
| private final StorageProperties storageProperties; |
| private final IFunctionManager functionManager; |
| private final LockList locks; |
| private final Map<String, Object> config; |
| private final Set<Dataset> txnAccessedDatasets; |
| |
| private Dataverse defaultDataverse; |
| private MetadataTransactionContext mdTxnCtx; |
| private boolean isWriteTransaction; |
| private IAWriterFactory writerFactory; |
| private FileSplit outputFile; |
| private boolean asyncResults; |
| private long maxResultReads; |
| private ResultSetId resultSetId; |
| private IResultSerializerFactoryProvider resultSerializerFactoryProvider; |
| private TxnId txnId; |
| private Map<String, Integer> externalDataLocks; |
| private boolean blockingOperatorDisabled = false; |
| |
| public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) { |
| Function<ICcApplicationContext, IMetadataProvider<?, ?>> factory = |
| ((ICCExtensionManager) appCtx.getExtensionManager()).getMetadataProviderFactory(); |
| MetadataProvider mp = factory != null ? (MetadataProvider) factory.apply(appCtx) : new MetadataProvider(appCtx); |
| mp.setDefaultDataverse(defaultDataverse); |
| return mp; |
| } |
| |
| protected MetadataProvider(ICcApplicationContext appCtx) { |
| this.appCtx = appCtx; |
| this.storageComponentProvider = appCtx.getStorageComponentProvider(); |
| storageProperties = appCtx.getStorageProperties(); |
| functionManager = ((IFunctionExtensionManager) appCtx.getExtensionManager()).getFunctionManager(); |
| locks = new LockList(); |
| config = new HashMap<>(); |
| txnAccessedDatasets = new HashSet<>(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T> T getProperty(String name) { |
| return (T) config.get(name); |
| } |
| |
| public void setProperty(String name, Object value) { |
| config.put(name, value); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T> T removeProperty(String name) { |
| return (T) config.remove(name); |
| } |
| |
| public boolean getBooleanProperty(String name, boolean defaultValue) { |
| Object v = config.get(name); |
| return v != null ? Boolean.parseBoolean(String.valueOf(v)) : defaultValue; |
| } |
| |
| public void disableBlockingOperator() { |
| blockingOperatorDisabled = true; |
| } |
| |
| public boolean isBlockingOperatorDisabled() { |
| return blockingOperatorDisabled; |
| } |
| |
| @Override |
| public Map<String, Object> getConfig() { |
| return config; |
| } |
| |
| public void setTxnId(TxnId txnId) { |
| this.txnId = txnId; |
| } |
| |
| public void setDefaultDataverse(Dataverse defaultDataverse) { |
| this.defaultDataverse = defaultDataverse == null ? MetadataBuiltinEntities.DEFAULT_DATAVERSE : defaultDataverse; |
| } |
| |
| public Dataverse getDefaultDataverse() { |
| return defaultDataverse; |
| } |
| |
| public DataverseName getDefaultDataverseName() { |
| return defaultDataverse.getDataverseName(); |
| } |
| |
| public void setWriteTransaction(boolean writeTransaction) { |
| this.isWriteTransaction = writeTransaction; |
| } |
| |
| public void setWriterFactory(IAWriterFactory writerFactory) { |
| this.writerFactory = writerFactory; |
| } |
| |
| public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) { |
| this.mdTxnCtx = mdTxnCtx; |
| txnAccessedDatasets.clear(); |
| } |
| |
| public MetadataTransactionContext getMetadataTxnContext() { |
| return mdTxnCtx; |
| } |
| |
| public IAWriterFactory getWriterFactory() { |
| return this.writerFactory; |
| } |
| |
| public FileSplit getOutputFile() { |
| return outputFile; |
| } |
| |
| public void setOutputFile(FileSplit outputFile) { |
| this.outputFile = outputFile; |
| } |
| |
| public boolean getResultAsyncMode() { |
| return asyncResults; |
| } |
| |
| public void setResultAsyncMode(boolean asyncResults) { |
| this.asyncResults = asyncResults; |
| } |
| |
| public void setMaxResultReads(long maxResultReads) { |
| this.maxResultReads = maxResultReads; |
| } |
| |
| public long getMaxResultReads() { |
| return maxResultReads; |
| } |
| |
| public ResultSetId getResultSetId() { |
| return resultSetId; |
| } |
| |
| public void setResultSetId(ResultSetId resultSetId) { |
| this.resultSetId = resultSetId; |
| } |
| |
| public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) { |
| this.resultSerializerFactoryProvider = rafp; |
| } |
| |
| public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() { |
| return resultSerializerFactoryProvider; |
| } |
| |
| public boolean isWriteTransaction() { |
| // The transaction writes persistent datasets. |
| return isWriteTransaction; |
| } |
| |
| public IFunctionManager getFunctionManager() { |
| return functionManager; |
| } |
| |
| public IDataFormat getDataFormat() { |
| return FormatUtils.getDefaultFormat(); |
| } |
| |
| public StorageProperties getStorageProperties() { |
| return storageProperties; |
| } |
| |
| public Map<String, Integer> getExternalDataLocks() { |
| return externalDataLocks; |
| } |
| |
| public void setExternalDataLocks(Map<String, Integer> locks) { |
| this.externalDataLocks = locks; |
| } |
| |
| private DataverseName getActiveDataverseName(DataverseName dataverseName) { |
| return dataverseName != null ? dataverseName |
| : defaultDataverse != null ? defaultDataverse.getDataverseName() : null; |
| } |
| |
| /** |
| * Retrieve the Output RecordType, as defined by "set output-record-type". |
| */ |
| public ARecordType findOutputRecordType() throws AlgebricksException { |
| return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(), |
| getProperty("output-record-type")); |
| } |
| |
| public Dataset findDataset(DataverseName dataverseName, String datasetName) throws AlgebricksException { |
| DataverseName dvName = getActiveDataverseName(dataverseName); |
| if (dvName == null) { |
| return null; |
| } |
| appCtx.getMetadataLockManager().acquireDataverseReadLock(locks, dvName); |
| appCtx.getMetadataLockManager().acquireDatasetReadLock(locks, dvName, datasetName); |
| return MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName); |
| } |
| |
| public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException { |
| return MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, nodeGroupName); |
| } |
| |
| public List<String> findNodes(String nodeGroupName) throws AlgebricksException { |
| return MetadataManagerUtil.findNodes(mdTxnCtx, nodeGroupName); |
| } |
| |
| public IAType findType(DataverseName dataverseName, String typeName) throws AlgebricksException { |
| return MetadataManagerUtil.findType(mdTxnCtx, dataverseName, typeName); |
| } |
| |
| public IAType findType(Dataset dataset) throws AlgebricksException { |
| return findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); |
| } |
| |
| public IAType findMetaType(Dataset dataset) throws AlgebricksException { |
| return findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName()); |
| } |
| |
| public Feed findFeed(DataverseName dataverseName, String feedName) throws AlgebricksException { |
| return MetadataManagerUtil.findFeed(mdTxnCtx, dataverseName, feedName); |
| } |
| |
| public FeedConnection findFeedConnection(DataverseName dataverseName, String feedName, String datasetName) |
| throws AlgebricksException { |
| return MetadataManagerUtil.findFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName); |
| } |
| |
| public FeedPolicyEntity findFeedPolicy(DataverseName dataverseName, String policyName) throws AlgebricksException { |
| return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverseName, policyName); |
| } |
| |
| @Override |
| public DataSource findDataSource(DataSourceId id) throws AlgebricksException { |
| return MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, id); |
| } |
| |
| public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws AlgebricksException { |
| return MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), mdTxnCtx, aqlId); |
| } |
| |
| @Override |
| public IDataSourceIndex<String, DataSourceId> findDataSourceIndex(String indexId, DataSourceId dataSourceId) |
| throws AlgebricksException { |
| DataSource source = findDataSource(dataSourceId); |
| Dataset dataset = ((DatasetDataSource) source).getDataset(); |
| Index secondaryIndex = getIndex(dataset.getDataverseName(), dataset.getDatasetName(), indexId); |
| return (secondaryIndex != null) |
| ? new DataSourceIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this) |
| : null; |
| } |
| |
| public Index getIndex(DataverseName dataverseName, String datasetName, String indexName) |
| throws AlgebricksException { |
| return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName); |
| } |
| |
| public List<Index> getDatasetIndexes(DataverseName dataverseName, String datasetName) throws AlgebricksException { |
| return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); |
| } |
| |
| public Pair<DataverseName, String> resolveDatasetNameUsingSynonyms(DataverseName dataverseName, String datasetName) |
| throws AlgebricksException { |
| DataverseName dvName = getActiveDataverseName(dataverseName); |
| if (dvName == null) { |
| return null; |
| } |
| while (MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName) == null) { |
| Synonym synonym = findSynonym(dvName, datasetName); |
| if (synonym == null) { |
| return null; |
| } |
| dvName = synonym.getObjectDataverseName(); |
| datasetName = synonym.getObjectName(); |
| } |
| return new Pair<>(dvName, datasetName); |
| } |
| |
| public Synonym findSynonym(DataverseName dataverseName, String synonymName) throws AlgebricksException { |
| return MetadataManagerUtil.findSynonym(mdTxnCtx, dataverseName, synonymName); |
| } |
| |
| @Override |
| public IFunctionInfo lookupFunction(FunctionIdentifier fid) { |
| return BuiltinFunctions.lookupFunction(fid); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime( |
| IDataSource<DataSourceId> dataSource, List<LogicalVariable> scanVariables, |
| List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars, |
| List<LogicalVariable> maxFilterVars, ITupleFilterFactory tupleFilterFactory, long outputLimit, |
| IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec, |
| Object implConfig) throws AlgebricksException { |
| return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables, |
| projectPushed, minFilterVars, maxFilterVars, tupleFilterFactory, outputLimit, opSchema, typeEnv, |
| context, jobSpec, implConfig); |
| } |
| |
| protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildLoadableDatasetScan( |
| JobSpecification jobSpec, IAdapterFactory adapterFactory, RecordDescriptor rDesc) |
| throws AlgebricksException { |
| ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory); |
| try { |
| return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint()); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| } |
| |
| public Dataverse findDataverse(DataverseName dataverseName) throws AlgebricksException { |
| return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName); |
| } |
| |
| public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, IAdapterFactory> buildFeedIntakeRuntime( |
| JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception { |
| Triple<IAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput; |
| factoryOutput = |
| FeedMetadataUtil.getFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext()); |
| ARecordType recordType = |
| FeedMetadataUtil.getOutputType(feed, feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME)); |
| IAdapterFactory adapterFactory = factoryOutput.first; |
| FeedIntakeOperatorDescriptor feedIngestor = null; |
| switch (factoryOutput.third) { |
| case INTERNAL: |
| feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, adapterFactory, recordType, |
| policyAccessor, factoryOutput.second); |
| break; |
| case EXTERNAL: |
| String libraryName = feed.getConfiguration().get(ExternalDataConstants.KEY_ADAPTER_NAME).trim() |
| .split(FeedConstants.NamingConstants.LIBRARY_NAME_SEPARATOR)[0]; |
| feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, libraryName, |
| adapterFactory.getClass().getName(), recordType, policyAccessor, factoryOutput.second); |
| break; |
| default: |
| break; |
| } |
| |
| AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint(); |
| return new Triple<>(feedIngestor, partitionConstraint, adapterFactory); |
| } |
| |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec, |
| IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput, |
| boolean retainMissing, Dataset dataset, String indexName, int[] lowKeyFields, int[] highKeyFields, |
| boolean lowKeyInclusive, boolean highKeyInclusive, boolean propagateFilter, int[] minFilterFieldIndexes, |
| int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit, |
| boolean isIndexOnlyPlan) throws AlgebricksException { |
| boolean isSecondary = true; |
| Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), dataset.getDatasetName()); |
| if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) { |
| isSecondary = !indexName.equals(primaryIndex.getIndexName()); |
| } |
| Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName) : primaryIndex; |
| int numPrimaryKeys = dataset.getPrimaryKeys().size(); |
| RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = |
| getSplitProviderAndConstraints(dataset, theIndex.getIndexName()); |
| int[] primaryKeyFields = new int[numPrimaryKeys]; |
| for (int i = 0; i < numPrimaryKeys; i++) { |
| primaryKeyFields[i] = i; |
| } |
| |
| int[] primaryKeyFieldsInSecondaryIndex = null; |
| byte[] successValueForIndexOnlyPlan = null; |
| byte[] failValueForIndexOnlyPlan = null; |
| boolean proceedIndexOnlyPlan = isIndexOnlyPlan && isSecondary; |
| if (proceedIndexOnlyPlan) { |
| int numSecondaryKeys = theIndex.getKeyFieldNames().size(); |
| primaryKeyFieldsInSecondaryIndex = new int[numPrimaryKeys]; |
| for (int i = 0; i < numPrimaryKeys; i++) { |
| primaryKeyFieldsInSecondaryIndex[i] = i + numSecondaryKeys; |
| } |
| // Defines the return value from a secondary index search if this is an index-only plan. |
| failValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(0); |
| successValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(1); |
| } |
| |
| ISearchOperationCallbackFactory searchCallbackFactory = |
| dataset.getSearchCallbackFactory(storageComponentProvider, theIndex, IndexOperation.SEARCH, |
| primaryKeyFields, primaryKeyFieldsInSecondaryIndex, proceedIndexOnlyPlan); |
| IStorageManager storageManager = getStorageComponentProvider().getStorageManager(); |
| IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(storageManager, spPc.first); |
| BTreeSearchOperatorDescriptor btreeSearchOp; |
| |
| if (dataset.getDatasetType() == DatasetType.INTERNAL) { |
| btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields, |
| lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, |
| context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, |
| maxFilterFieldIndexes, propagateFilter, tupleFilterFactory, outputLimit, proceedIndexOnlyPlan, |
| failValueForIndexOnlyPlan, successValueForIndexOnlyPlan); |
| } else { |
| btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, |
| highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing, |
| context.getMissingWriterFactory(), searchCallbackFactory, minFilterFieldIndexes, |
| maxFilterFieldIndexes, ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); |
| } |
| return new Pair<>(btreeSearchOp, spPc.second); |
| } |
| |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec, |
| List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, |
| JobGenContext context, boolean retainInput, boolean retainMissing, Dataset dataset, String indexName, |
| int[] keyFields, boolean propagateFilter, int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, |
| boolean isIndexOnlyPlan) throws AlgebricksException { |
| int numPrimaryKeys = dataset.getPrimaryKeys().size(); |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| if (secondaryIndex == null) { |
| throw new AlgebricksException( |
| "Code generation error: no index " + indexName + " for dataset " + dataset.getDatasetName()); |
| } |
| RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = |
| getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); |
| int[] primaryKeyFields = new int[numPrimaryKeys]; |
| for (int i = 0; i < numPrimaryKeys; i++) { |
| primaryKeyFields[i] = i; |
| } |
| |
| int[] primaryKeyFieldsInSecondaryIndex = null; |
| byte[] successValueForIndexOnlyPlan = null; |
| byte[] failValueForIndexOnlyPlan = null; |
| if (isIndexOnlyPlan) { |
| ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName()); |
| List<List<String>> secondaryKeyFields = secondaryIndex.getKeyFieldNames(); |
| List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); |
| Pair<IAType, Boolean> keyTypePair = |
| Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType); |
| IAType keyType = keyTypePair.first; |
| int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag()); |
| int numNestedSecondaryKeyFields = numDimensions * 2; |
| primaryKeyFieldsInSecondaryIndex = new int[numPrimaryKeys]; |
| for (int i = 0; i < numPrimaryKeys; i++) { |
| primaryKeyFieldsInSecondaryIndex[i] = i + numNestedSecondaryKeyFields; |
| } |
| // Defines the return value from a secondary index search if this is an index-only plan. |
| failValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(0); |
| successValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(1); |
| } |
| |
| ISearchOperationCallbackFactory searchCallbackFactory = |
| dataset.getSearchCallbackFactory(storageComponentProvider, secondaryIndex, IndexOperation.SEARCH, |
| primaryKeyFields, primaryKeyFieldsInSecondaryIndex, isIndexOnlyPlan); |
| RTreeSearchOperatorDescriptor rtreeSearchOp; |
| IIndexDataflowHelperFactory indexDataflowHelperFactory = |
| new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); |
| if (dataset.getDatasetType() == DatasetType.INTERNAL) { |
| rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, |
| indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), |
| searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, propagateFilter, |
| isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan); |
| } else { |
| // Create the operator |
| rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true, |
| indexDataflowHelperFactory, retainInput, retainMissing, context.getMissingWriterFactory(), |
| searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, |
| ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); |
| } |
| |
| return new Pair<>(rtreeSearchOp, spPc.second); |
| } |
| |
| @Override |
| public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, |
| int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) { |
| FileSplitDataSink fsds = (FileSplitDataSink) sink; |
| FileSplitSinkId fssi = fsds.getId(); |
| FileSplit fs = fssi.getFileSplit(); |
| File outFile = new File(fs.getPath()); |
| String nodeId = fs.getNodeName(); |
| |
| SinkWriterRuntimeFactory runtime = |
| new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc); |
| AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId }); |
| return new Pair<>(runtime, apc); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, |
| int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, |
| IResultMetadata metadata, JobSpecification spec) throws AlgebricksException { |
| ResultSetDataSink rsds = (ResultSetDataSink) sink; |
| ResultSetSinkId rssId = rsds.getId(); |
| ResultSetId rsId = rssId.getResultSetId(); |
| ResultWriterOperatorDescriptor resultWriter = null; |
| try { |
| IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider |
| .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory()); |
| resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(), |
| resultSerializedAppenderFactory, getMaxResultReads()); |
| } catch (IOException e) { |
| throw new AlgebricksException(e); |
| } |
| return new Pair<>(resultWriter, null); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime( |
| IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys, |
| LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, JobGenContext context, |
| JobSpecification spec) throws AlgebricksException { |
| DataverseName dataverseName = dataSource.getId().getDataverseName(); |
| String datasetName = dataSource.getId().getDatasourceName(); |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| int numKeys = keys.size(); |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| |
| // move key fields to front |
| int[] fieldPermutation = new int[numKeys + 1 + numFilterFields]; |
| int i = 0; |
| for (LogicalVariable varKey : keys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| fieldPermutation[numKeys] = propagatedSchema.findVariable(payload); |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); |
| fieldPermutation[numKeys + 1] = idx; |
| } |
| |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset); |
| long numElementsHint = getCardinalityPerPartitionHint(dataset); |
| // TODO |
| // figure out the right behavior of the bulkload and then give the |
| // right callback |
| // (ex. what's the expected behavior when there is an error during |
| // bulkload?) |
| IIndexDataflowHelperFactory indexHelperFactory = |
| new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); |
| LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new LSMIndexBulkLoadOperatorDescriptor(spec, null, |
| fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, true, |
| indexHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), null); |
| return new Pair<>(btreeBulkLoad, splitsAndConstraint.second); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime( |
| IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, |
| List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, |
| List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context, |
| JobSpecification spec, boolean bulkload) throws AlgebricksException { |
| return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload, |
| additionalNonKeyFields, inputRecordDesc, context, spec, bulkload, additionalNonFilteringFields); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime( |
| IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv, |
| List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, |
| RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec) throws AlgebricksException { |
| return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload, |
| additionalNonKeyFields, inputRecordDesc, context, spec, false, null); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime( |
| IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, |
| IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, |
| ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, |
| boolean bulkload) throws AlgebricksException { |
| return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, |
| inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, |
| context, spec, bulkload, null, null, null); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime( |
| IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, |
| IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, |
| ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec) |
| throws AlgebricksException { |
| return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, |
| inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, recordDesc, |
| context, spec, false, null, null, null); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime( |
| IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, |
| IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys, |
| ILogicalExpression filterExpr, LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, |
| LogicalVariable prevAdditionalFilteringKey, RecordDescriptor recordDesc, JobGenContext context, |
| JobSpecification spec) throws AlgebricksException { |
| return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, |
| inputSchemas, typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, recordDesc, |
| context, spec, false, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKey); |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime( |
| IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema, |
| IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc, |
| JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException { |
| |
| String indexName = dataSourceIndex.getId(); |
| DataverseName dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); |
| String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); |
| |
| IOperatorSchema inputSchema; |
| if (inputSchemas.length > 0) { |
| inputSchema = inputSchemas[0]; |
| } else { |
| throw new AlgebricksException("TokenizeOperator can not operate without any input variable."); |
| } |
| |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| // TokenizeOperator only supports a keyword or n-gram index. |
| switch (secondaryIndex.getIndexType()) { |
| case SINGLE_PARTITION_WORD_INVIX: |
| case SINGLE_PARTITION_NGRAM_INVIX: |
| case LENGTH_PARTITIONED_WORD_INVIX: |
| case LENGTH_PARTITIONED_NGRAM_INVIX: |
| return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema, |
| primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType()); |
| default: |
| throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: " |
| + secondaryIndex.getIndexType()); |
| } |
| } |
| |
| /** |
| * Calculate an estimate size of the bloom filter. Note that this is an |
| * estimation which assumes that the data is going to be uniformly distributed |
| * across all partitions. |
| * |
| * @param dataset |
| * @return Number of elements that will be used to create a bloom filter per |
| * dataset per partition |
| * @throws AlgebricksException |
| */ |
| public long getCardinalityPerPartitionHint(Dataset dataset) throws AlgebricksException { |
| String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME); |
| long numElementsHint; |
| if (numElementsHintString == null) { |
| numElementsHint = DatasetCardinalityHint.DEFAULT; |
| } else { |
| numElementsHint = Long.parseLong(numElementsHintString); |
| } |
| int numPartitions = 0; |
| List<String> nodeGroup = |
| MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, dataset.getNodeGroupName()).getNodeNames(); |
| IClusterStateManager csm = appCtx.getClusterStateManager(); |
| for (String nd : nodeGroup) { |
| numPartitions += csm.getNodePartitionsCount(nd); |
| } |
| return numElementsHint / numPartitions; |
| } |
| |
| protected IAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName, |
| Map<String, String> configuration, ARecordType itemType, ARecordType metaType) throws AlgebricksException { |
| try { |
| configuration.put(ExternalDataConstants.KEY_DATAVERSE, dataset.getDataverseName().getCanonicalForm()); |
| IAdapterFactory adapterFactory = AdapterFactoryProvider.getAdapterFactory( |
| getApplicationContext().getServiceContext(), adapterName, configuration, itemType, metaType); |
| |
| // check to see if dataset is indexed |
| Index filesIndex = |
| MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName(), |
| dataset.getDatasetName().concat(IndexingConstants.EXTERNAL_FILE_INDEX_NAME_SUFFIX)); |
| |
| if (filesIndex != null && filesIndex.getPendingOp() == 0) { |
| // get files |
| List<ExternalFile> files = MetadataManager.INSTANCE.getDatasetExternalFiles(mdTxnCtx, dataset); |
| Iterator<ExternalFile> iterator = files.iterator(); |
| while (iterator.hasNext()) { |
| if (iterator.next().getPendingOp() != ExternalFilePendingOp.NO_OP) { |
| iterator.remove(); |
| } |
| } |
| } |
| |
| return adapterFactory; |
| } catch (Exception e) { |
| throw new AlgebricksException("Unable to create adapter", e); |
| } |
| } |
| |
| public TxnId getTxnId() { |
| return txnId; |
| } |
| |
| public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields) |
| throws AlgebricksException { |
| return LinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true, |
| numKeyFields / 2); |
| } |
| |
| public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitAndConstraints(DataverseName dataverseName) { |
| return SplitsAndConstraintsUtil.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), |
| dataverseName); |
| } |
| |
| public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName) |
| throws AlgebricksException { |
| return SplitsAndConstraintsUtil.getIndexSplits(dataset, indexName, mdTxnCtx, appCtx.getClusterStateManager()); |
| } |
| |
| public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName, |
| String adapterName) throws AlgebricksException { |
| DatasourceAdapter adapter; |
| // search in default namespace (built-in adapter) |
| adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName); |
| |
| // search in dataverse (user-defined adapter) |
| if (adapter == null) { |
| adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName); |
| } |
| return adapter; |
| } |
| |
| public AlgebricksAbsolutePartitionConstraint getClusterLocations() { |
| return appCtx.getClusterStateManager().getClusterLocations(); |
| } |
| |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime( |
| JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput, |
| IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context, |
| MetadataProvider metadataProvider, boolean retainMissing) throws AlgebricksException { |
| try { |
| // Get data type |
| ARecordType itemType = |
| (ARecordType) MetadataManager.INSTANCE.getDatatype(metadataProvider.getMetadataTxnContext(), |
| dataset.getDataverseName(), dataset.getItemTypeName()).getDatatype(); |
| ExternalDatasetDetails datasetDetails = (ExternalDatasetDetails) dataset.getDatasetDetails(); |
| LookupAdapterFactory<?> adapterFactory = AdapterFactoryProvider.getLookupAdapterFactory( |
| getApplicationContext().getServiceContext(), datasetDetails.getProperties(), itemType, ridIndexes, |
| retainInput, retainMissing, context.getMissingWriterFactory()); |
| String fileIndexName = IndexingConstants.getFilesIndexName(dataset.getDatasetName()); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = |
| metadataProvider.getSplitProviderAndConstraints(dataset, fileIndexName); |
| Index fileIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), fileIndexName); |
| // Create the file index data flow helper |
| IIndexDataflowHelperFactory indexDataflowHelperFactory = |
| new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), spPc.first); |
| // Create the out record descriptor, appContext and fileSplitProvider for the |
| // files index |
| RecordDescriptor outRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context); |
| ISearchOperationCallbackFactory searchOpCallbackFactory = |
| dataset.getSearchCallbackFactory(storageComponentProvider, fileIndex, IndexOperation.SEARCH, null); |
| // Create the operator |
| ExternalLookupOperatorDescriptor op = new ExternalLookupOperatorDescriptor(jobSpec, adapterFactory, |
| outRecDesc, indexDataflowHelperFactory, searchOpCallbackFactory, |
| ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this)); |
| return new Pair<>(op, spPc.second); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| } |
| |
| @Override |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime( |
| IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv, |
| List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys, |
| List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context, |
| JobSpecification spec) throws AlgebricksException { |
| String datasetName = dataSource.getId().getDatasourceName(); |
| Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName); |
| if (dataset == null) { |
| throw new AlgebricksException( |
| "Unknown dataset " + datasetName + " in dataverse " + dataSource.getId().getDataverseName()); |
| } |
| int numKeys = primaryKeys.size(); |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size(); |
| // Move key fields to front. [keys, record, filters] |
| int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields]; |
| int[] bloomFilterKeyFields = new int[numKeys]; |
| int i = 0; |
| // set the keys' permutations |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = inputSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| bloomFilterKeyFields[i] = i; |
| i++; |
| } |
| // set the record permutation |
| fieldPermutation[i++] = inputSchema.findVariable(payload); |
| // set the filters' permutations. |
| if (numFilterFields > 0) { |
| int idx = inputSchema.findVariable(filterKeys.get(0)); |
| fieldPermutation[i++] = idx; |
| } |
| |
| if (additionalNonFilterFields != null) { |
| for (LogicalVariable var : additionalNonFilterFields) { |
| int idx = inputSchema.findVariable(var); |
| fieldPermutation[i++] = idx; |
| } |
| } |
| return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation, |
| context.getMissingWriterFactory()); |
| } |
| |
| public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetDataScannerRuntime( |
| JobSpecification jobSpec, IAType itemType, IAdapterFactory adapterFactory) throws AlgebricksException { |
| if (itemType.getTypeTag() != ATypeTag.OBJECT) { |
| throw new AlgebricksException("Can only scan datasets of records."); |
| } |
| |
| ISerializerDeserializer<?> payloadSerde = |
| getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType); |
| RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde }); |
| |
| ExternalScanOperatorDescriptor dataScanner = |
| new ExternalScanOperatorDescriptor(jobSpec, scannerDesc, adapterFactory); |
| |
| AlgebricksPartitionConstraint constraint; |
| try { |
| constraint = adapterFactory.getPartitionConstraint(); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| |
| return new Pair<>(dataScanner, constraint); |
| } |
| |
| private Pair<IBinaryComparatorFactory[], ITypeTraits[]> getComparatorFactoriesAndTypeTraitsOfSecondaryBTreeIndex( |
| List<List<String>> sidxKeyFieldNames, List<IAType> sidxKeyFieldTypes, List<List<String>> pidxKeyFieldNames, |
| ARecordType recType, DatasetType dsType, boolean hasMeta, List<Integer> primaryIndexKeyIndicators, |
| List<Integer> secondaryIndexIndicators, ARecordType metaType) throws AlgebricksException { |
| |
| IBinaryComparatorFactory[] comparatorFactories; |
| ITypeTraits[] typeTraits; |
| int sidxKeyFieldCount = sidxKeyFieldNames.size(); |
| int pidxKeyFieldCount = pidxKeyFieldNames.size(); |
| typeTraits = new ITypeTraits[sidxKeyFieldCount + pidxKeyFieldCount]; |
| comparatorFactories = new IBinaryComparatorFactory[sidxKeyFieldCount + pidxKeyFieldCount]; |
| |
| int i = 0; |
| for (; i < sidxKeyFieldCount; ++i) { |
| Pair<IAType, Boolean> keyPairType = |
| Index.getNonNullableOpenFieldType(sidxKeyFieldTypes.get(i), sidxKeyFieldNames.get(i), |
| (hasMeta && secondaryIndexIndicators.get(i).intValue() == 1) ? metaType : recType); |
| IAType keyType = keyPairType.first; |
| comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); |
| typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); |
| } |
| |
| for (int j = 0; j < pidxKeyFieldCount; ++j, ++i) { |
| IAType keyType = null; |
| try { |
| switch (dsType) { |
| case INTERNAL: |
| keyType = (hasMeta && primaryIndexKeyIndicators.get(j).intValue() == 1) |
| ? metaType.getSubFieldType(pidxKeyFieldNames.get(j)) |
| : recType.getSubFieldType(pidxKeyFieldNames.get(j)); |
| break; |
| case EXTERNAL: |
| keyType = IndexingConstants.getFieldType(j); |
| break; |
| default: |
| throw new AlgebricksException("Unknown Dataset Type"); |
| } |
| } catch (AsterixException e) { |
| throw new AlgebricksException(e); |
| } |
| comparatorFactories[i] = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType, true); |
| typeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); |
| } |
| |
| return new Pair<>(comparatorFactories, typeTraits); |
| } |
| |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp, |
| IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys, |
| LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc, |
| JobGenContext context, JobSpecification spec, boolean bulkload, |
| List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException { |
| |
| String datasetName = dataSource.getId().getDatasourceName(); |
| Dataset dataset = |
| MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), datasetName); |
| int numKeys = keys.size(); |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| // Move key fields to front. |
| int[] fieldPermutation = new int[numKeys + 1 + numFilterFields |
| + (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())]; |
| int[] bloomFilterKeyFields = new int[numKeys]; |
| int i = 0; |
| for (LogicalVariable varKey : keys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| bloomFilterKeyFields[i] = i; |
| i++; |
| } |
| fieldPermutation[i++] = propagatedSchema.findVariable(payload); |
| int[] filterFields = new int[numFilterFields]; |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); |
| fieldPermutation[i++] = idx; |
| filterFields[0] = idx; |
| } |
| if (additionalNonFilteringFields != null) { |
| for (LogicalVariable variable : additionalNonFilteringFields) { |
| int idx = propagatedSchema.findVariable(variable); |
| fieldPermutation[i++] = idx; |
| } |
| } |
| |
| Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), dataset.getDatasetName()); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset); |
| |
| // prepare callback |
| int[] primaryKeyFields = new int[numKeys]; |
| for (i = 0; i < numKeys; i++) { |
| primaryKeyFields[i] = i; |
| } |
| IModificationOperationCallbackFactory modificationCallbackFactory = dataset |
| .getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); |
| IIndexDataflowHelperFactory idfh = |
| new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); |
| IOperatorDescriptor op; |
| if (bulkload) { |
| long numElementsHint = getCardinalityPerPartitionHint(dataset); |
| op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, |
| StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null, |
| BulkLoadUsage.LOAD, dataset.getDatasetId(), null); |
| } else { |
| if (indexOp == IndexOperation.INSERT) { |
| ISearchOperationCallbackFactory searchCallbackFactory = dataset |
| .getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields); |
| |
| Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE |
| .getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream() |
| .filter(index -> index.isPrimaryKeyIndex()).findFirst(); |
| IIndexDataflowHelperFactory pkidfh = null; |
| if (primaryKeyIndex.isPresent()) { |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primaryKeySplitsAndConstraint = |
| getSplitProviderAndConstraints(dataset, primaryKeyIndex.get().getIndexName()); |
| pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), |
| primaryKeySplitsAndConstraint.first); |
| } |
| op = new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh, |
| modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields); |
| |
| } else { |
| op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, |
| null, true, modificationCallbackFactory); |
| } |
| } |
| return new Pair<>(op, splitsAndConstraint.second); |
| } |
| |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteOrUpsertRuntime( |
| IndexOperation indexOp, IDataSourceIndex<String, DataSourceId> dataSourceIndex, |
| IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, |
| List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, |
| List<LogicalVariable> additionalNonKeyFields, ILogicalExpression filterExpr, |
| RecordDescriptor inputRecordDesc, JobGenContext context, JobSpecification spec, boolean bulkload, |
| LogicalVariable upsertIndicatorVar, List<LogicalVariable> prevSecondaryKeys, |
| LogicalVariable prevAdditionalFilteringKey) throws AlgebricksException { |
| String indexName = dataSourceIndex.getId(); |
| DataverseName dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName(); |
| String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName(); |
| |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| |
| ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null; |
| if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) { |
| prevAdditionalFilteringKeys = new ArrayList<>(); |
| prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey); |
| } |
| AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context); |
| switch (secondaryIndex.getIndexType()) { |
| case BTREE: |
| return getBTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, |
| secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, |
| bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys); |
| case RTREE: |
| return getRTreeRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, |
| secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, |
| bulkload, upsertIndicatorVar, prevSecondaryKeys, prevAdditionalFilteringKeys); |
| case SINGLE_PARTITION_WORD_INVIX: |
| case SINGLE_PARTITION_NGRAM_INVIX: |
| case LENGTH_PARTITIONED_WORD_INVIX: |
| case LENGTH_PARTITIONED_NGRAM_INVIX: |
| return getInvertedIndexRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys, |
| secondaryKeys, additionalNonKeyFields, filterFactory, inputRecordDesc, context, spec, indexOp, |
| secondaryIndex.getIndexType(), bulkload, upsertIndicatorVar, prevSecondaryKeys, |
| prevAdditionalFilteringKeys); |
| default: |
| throw new AlgebricksException( |
| indexOp.name() + "Insert, upsert, and delete not implemented for index type: " |
| + secondaryIndex.getIndexType()); |
| } |
| } |
| |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeRuntime(DataverseName dataverseName, |
| String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, |
| AsterixTupleFilterFactory filterFactory, RecordDescriptor inputRecordDesc, JobGenContext context, |
| JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar, |
| List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) |
| throws AlgebricksException { |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| int numKeys = primaryKeys.size() + secondaryKeys.size(); |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| |
| // generate field permutations |
| int[] fieldPermutation = new int[numKeys + numFilterFields]; |
| int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; |
| int i = 0; |
| int j = 0; |
| for (LogicalVariable varKey : secondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| modificationCallbackPrimaryKeyFields[j] = i; |
| i++; |
| j++; |
| } |
| |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); |
| fieldPermutation[numKeys] = idx; |
| } |
| |
| int[] prevFieldPermutation = null; |
| if (indexOp == IndexOperation.UPSERT) { |
| // generate field permutations for prev record |
| prevFieldPermutation = new int[numKeys + numFilterFields]; |
| int k = 0; |
| for (LogicalVariable varKey : prevSecondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| prevFieldPermutation[k] = idx; |
| k++; |
| } |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| prevFieldPermutation[k] = idx; |
| k++; |
| } |
| // Filter can only be one field! |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0)); |
| prevFieldPermutation[numKeys] = idx; |
| } |
| } |
| try { |
| // Index parameters. |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); |
| // prepare callback |
| IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( |
| storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); |
| IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), splitsAndConstraint.first); |
| IOperatorDescriptor op; |
| if (bulkload) { |
| long numElementsHint = getCardinalityPerPartitionHint(dataset); |
| op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, |
| StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null, |
| BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); |
| } else if (indexOp == IndexOperation.UPSERT) { |
| int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); |
| op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, |
| filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, |
| BinaryBooleanInspector.FACTORY, prevFieldPermutation); |
| } else { |
| op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh, |
| filterFactory, false, modificationCallbackFactory); |
| } |
| return new Pair<>(op, splitsAndConstraint.second); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| } |
| |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeRuntime(DataverseName dataverseName, |
| String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, |
| List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields, |
| AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc, JobGenContext context, |
| JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable upsertIndicatorVar, |
| List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) |
| throws AlgebricksException { |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| String itemTypeName = dataset.getItemTypeName(); |
| IAType itemType = MetadataManager.INSTANCE |
| .getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype(); |
| validateRecordType(itemType); |
| ARecordType recType = (ARecordType) itemType; |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames(); |
| List<IAType> secondaryKeyTypes = secondaryIndex.getKeyFieldTypes(); |
| Pair<IAType, Boolean> keyPairType = |
| Index.getNonNullableOpenFieldType(secondaryKeyTypes.get(0), secondaryKeyExprs.get(0), recType); |
| IAType spatialType = keyPairType.first; |
| int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag()); |
| int numSecondaryKeys = dimension * 2; |
| int numPrimaryKeys = primaryKeys.size(); |
| int numKeys = numSecondaryKeys + numPrimaryKeys; |
| |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| int[] fieldPermutation = new int[numKeys + numFilterFields]; |
| int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; |
| int i = 0; |
| int j = 0; |
| |
| for (LogicalVariable varKey : secondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| modificationCallbackPrimaryKeyFields[j] = i; |
| i++; |
| j++; |
| } |
| |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); |
| fieldPermutation[numKeys] = idx; |
| } |
| |
| int[] prevFieldPermutation = null; |
| if (indexOp == IndexOperation.UPSERT) { |
| // Get field permutation for previous value |
| prevFieldPermutation = new int[numKeys + numFilterFields]; |
| i = 0; |
| |
| // Get field permutation for new value |
| for (LogicalVariable varKey : prevSecondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| prevFieldPermutation[i] = idx; |
| i++; |
| } |
| for (int k = 0; k < numPrimaryKeys; k++) { |
| prevFieldPermutation[k + i] = fieldPermutation[k + i]; |
| i++; |
| } |
| |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0)); |
| prevFieldPermutation[numKeys] = idx; |
| } |
| } |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); |
| |
| // prepare callback |
| IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( |
| storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); |
| IIndexDataflowHelperFactory indexDataflowHelperFactory = |
| new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first); |
| IOperatorDescriptor op; |
| if (bulkload) { |
| long numElementsHint = getCardinalityPerPartitionHint(dataset); |
| op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, |
| StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, |
| indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); |
| } else if (indexOp == IndexOperation.UPSERT) { |
| int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); |
| op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, |
| indexDataflowHelperFactory, filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, |
| BinaryBooleanInspector.FACTORY, prevFieldPermutation); |
| } else { |
| op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, |
| indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory); |
| } |
| return new Pair<>(op, splitsAndConstraint.second); |
| } |
| |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexRuntime( |
| DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema, |
| List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, |
| List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory, |
| RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec, IndexOperation indexOp, |
| IndexType indexType, boolean bulkload, LogicalVariable upsertIndicatorVar, |
| List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys) |
| throws AlgebricksException { |
| // Check the index is length-partitioned or not. |
| boolean isPartitioned; |
| if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX |
| || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { |
| isPartitioned = true; |
| } else { |
| isPartitioned = false; |
| } |
| |
| // Sanity checks. |
| if (primaryKeys.size() > 1) { |
| throw new AlgebricksException("Cannot create inverted index on dataset with composite primary key."); |
| } |
| // The size of secondaryKeys can be two if it receives input from its |
| // TokenizeOperator- [token, number of token] |
| if ((secondaryKeys.size() > 1 && !isPartitioned) || (secondaryKeys.size() > 2 && isPartitioned)) { |
| throw new AlgebricksException("Cannot create composite inverted index on multiple fields."); |
| } |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| // For tokenization, sorting and loading. |
| // One token (+ optional partitioning field) + primary keys: [token, |
| // number of token, PK] |
| int numKeys = primaryKeys.size() + secondaryKeys.size(); |
| int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1; |
| |
| // generate field permutations |
| int[] fieldPermutation = new int[numKeys + numFilterFields]; |
| int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; |
| int i = 0; |
| int j = 0; |
| |
| // If the index is partitioned: [token, number of token] |
| // Otherwise: [token] |
| for (LogicalVariable varKey : secondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| modificationCallbackPrimaryKeyFields[j] = i; |
| i++; |
| j++; |
| } |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0)); |
| fieldPermutation[numKeys] = idx; |
| } |
| |
| int[] prevFieldPermutation = null; |
| if (indexOp == IndexOperation.UPSERT) { |
| // Find permutations for prev value |
| prevFieldPermutation = new int[numKeys + numFilterFields]; |
| i = 0; |
| |
| // If the index is partitioned: [token, number of token] |
| // Otherwise: [token] |
| for (LogicalVariable varKey : prevSecondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| prevFieldPermutation[i] = idx; |
| i++; |
| } |
| |
| for (int k = 0; k < primaryKeys.size(); k++) { |
| prevFieldPermutation[k + i] = fieldPermutation[k + i]; |
| i++; |
| } |
| |
| if (numFilterFields > 0) { |
| int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0)); |
| prevFieldPermutation[numKeys] = idx; |
| } |
| } |
| try { |
| // Index parameters. |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); |
| |
| // prepare callback |
| IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( |
| storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields); |
| IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), splitsAndConstraint.first); |
| IOperatorDescriptor op; |
| if (bulkload) { |
| long numElementsHint = getCardinalityPerPartitionHint(dataset); |
| op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation, |
| StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory, |
| null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory); |
| } else if (indexOp == IndexOperation.UPSERT) { |
| int upsertIndicatorFieldIndex = propagatedSchema.findVariable(upsertIndicatorVar); |
| op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory, |
| filterFactory, modificationCallbackFactory, upsertIndicatorFieldIndex, |
| BinaryBooleanInspector.FACTORY, prevFieldPermutation); |
| } else { |
| op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp, |
| indexDataFlowFactory, filterFactory, false, modificationCallbackFactory); |
| } |
| return new Pair<>(op, splitsAndConstraint.second); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| } |
| |
| // Get a Tokenizer for the bulk-loading data into a n-gram or keyword index. |
| private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime( |
| DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema inputSchema, |
| IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys, |
| RecordDescriptor recordDesc, JobSpecification spec, IndexType indexType) throws AlgebricksException { |
| |
| // Sanity checks. |
| if (primaryKeys.size() > 1) { |
| throw new AlgebricksException("Cannot tokenize composite primary key."); |
| } |
| if (secondaryKeys.size() > 1) { |
| throw new AlgebricksException("Cannot tokenize composite secondary key fields."); |
| } |
| |
| boolean isPartitioned; |
| if (indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX |
| || indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX) { |
| isPartitioned = true; |
| } else { |
| isPartitioned = false; |
| } |
| |
| // Number of Keys that needs to be propagated |
| int numKeys = inputSchema.getSize(); |
| |
| // Get the rest of Logical Variables that are not (PK or SK) and each |
| // variable's positions. |
| // These variables will be propagated through TokenizeOperator. |
| List<LogicalVariable> otherKeys = new ArrayList<>(); |
| if (inputSchema.getSize() > 0) { |
| for (int k = 0; k < inputSchema.getSize(); k++) { |
| boolean found = false; |
| for (LogicalVariable varKey : primaryKeys) { |
| if (varKey.equals(inputSchema.getVariable(k))) { |
| found = true; |
| break; |
| } else { |
| found = false; |
| } |
| } |
| if (!found) { |
| for (LogicalVariable varKey : secondaryKeys) { |
| if (varKey.equals(inputSchema.getVariable(k))) { |
| found = true; |
| break; |
| } else { |
| found = false; |
| } |
| } |
| } |
| if (!found) { |
| otherKeys.add(inputSchema.getVariable(k)); |
| } |
| } |
| } |
| |
| // For tokenization, sorting and loading. |
| // One token (+ optional partitioning field) + primary keys + secondary |
| // keys + other variables |
| // secondary keys and other variables will be just passed to the |
| // IndexInsertDelete Operator. |
| int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys; |
| |
| // generate field permutations for the input |
| int[] fieldPermutation = new int[numKeys]; |
| |
| int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()]; |
| int i = 0; |
| int j = 0; |
| for (LogicalVariable varKey : primaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| modificationCallbackPrimaryKeyFields[j] = i; |
| i++; |
| j++; |
| } |
| for (LogicalVariable varKey : otherKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| for (LogicalVariable varKey : secondaryKeys) { |
| int idx = propagatedSchema.findVariable(varKey); |
| fieldPermutation[i] = idx; |
| i++; |
| } |
| |
| Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName); |
| String itemTypeName = dataset.getItemTypeName(); |
| IAType itemType; |
| try { |
| itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName) |
| .getDatatype(); |
| |
| if (itemType.getTypeTag() != ATypeTag.OBJECT) { |
| throw new AlgebricksException("Only record types can be tokenized."); |
| } |
| |
| ARecordType recType = (ARecordType) itemType; |
| |
| // Index parameters. |
| Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(), |
| dataset.getDatasetName(), indexName); |
| |
| List<List<String>> secondaryKeyExprs = secondaryIndex.getKeyFieldNames(); |
| List<IAType> secondaryKeyTypeEntries = secondaryIndex.getKeyFieldTypes(); |
| |
| int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1; |
| ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields]; |
| ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()]; |
| |
| // Find the key type of the secondary key. If it's a derived type, |
| // return the derived type. |
| // e.g. UNORDERED LIST -> return UNORDERED LIST type |
| IAType secondaryKeyType; |
| Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryKeyTypeEntries.get(0), |
| secondaryKeyExprs.get(0), recType); |
| secondaryKeyType = keyPairType.first; |
| List<List<String>> partitioningKeys = dataset.getPrimaryKeys(); |
| i = 0; |
| for (List<String> partitioningKey : partitioningKeys) { |
| IAType keyType = recType.getSubFieldType(partitioningKey); |
| invListsTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType); |
| ++i; |
| } |
| |
| tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType); |
| if (isPartitioned) { |
| // The partitioning field is hardcoded to be a short *without* |
| // an Asterix type tag. |
| tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS; |
| } |
| |
| IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory( |
| secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength()); |
| |
| Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = |
| getSplitProviderAndConstraints(dataset, secondaryIndex.getIndexName()); |
| |
| // Generate Output Record format |
| ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields]; |
| ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields]; |
| ISerializerDeserializerProvider serdeProvider = getDataFormat().getSerdeProvider(); |
| |
| // The order of the output record: propagated variables (including |
| // PK and SK), token, and number of token. |
| // #1. propagate all input variables |
| for (int k = 0; k < recordDesc.getFieldCount(); k++) { |
| tokenKeyPairFields[k] = recordDesc.getFields()[k]; |
| tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k]; |
| } |
| int tokenOffset = recordDesc.getFieldCount(); |
| |
| // #2. Specify the token type |
| tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType); |
| tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0]; |
| tokenOffset++; |
| |
| // #3. Specify the length-partitioning key: number of token |
| if (isPartitioned) { |
| tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE; |
| tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1]; |
| } |
| |
| RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits); |
| IOperatorDescriptor tokenizerOp; |
| |
| // Keys to be tokenized : SK |
| int docField = fieldPermutation[fieldPermutation.length - 1]; |
| |
| // Keys to be propagated |
| int[] keyFields = new int[numKeys]; |
| for (int k = 0; k < keyFields.length; k++) { |
| keyFields[k] = k; |
| } |
| |
| tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory, docField, |
| keyFields, isPartitioned, true, false, MissingWriterFactory.INSTANCE); |
| return new Pair<>(tokenizerOp, splitsAndConstraint.second); |
| } catch (Exception e) { |
| throw new AlgebricksException(e); |
| } |
| } |
| |
| @Override |
| public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas, |
| IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context) |
| throws AlgebricksException { |
| // No filtering condition. |
| if (filterExpr == null) { |
| return null; |
| } |
| IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider(); |
| IScalarEvaluatorFactory filterEvalFactory = |
| expressionRuntimeProvider.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas, context); |
| return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory()); |
| } |
| |
| private void validateRecordType(IAType itemType) throws AlgebricksException { |
| if (itemType.getTypeTag() != ATypeTag.OBJECT) { |
| throw new AlgebricksException("Only record types can be indexed."); |
| } |
| } |
| |
| public IStorageComponentProvider getStorageComponentProvider() { |
| return storageComponentProvider; |
| } |
| |
| public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds) |
| throws AlgebricksException { |
| return getSplitProviderAndConstraints(ds, ds.getDatasetName()); |
| } |
| |
| public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> getSplitProviderAndConstraints(Dataset ds, |
| String indexName) throws AlgebricksException { |
| FileSplit[] splits = splitsForIndex(mdTxnCtx, ds, indexName); |
| return StoragePathUtil.splitProviderAndPartitionConstraints(splits); |
| } |
| |
| public LockList getLocks() { |
| return locks; |
| } |
| |
| public ICcApplicationContext getApplicationContext() { |
| return appCtx; |
| } |
| |
| public ITxnIdFactory getTxnIdFactory() { |
| return appCtx.getTxnIdFactory(); |
| } |
| |
| public ICompressionManager getCompressionManager() { |
| return appCtx.getCompressionManager(); |
| } |
| |
| public void addAccessedDataset(Dataset dataset) { |
| txnAccessedDatasets.add(dataset); |
| } |
| |
| public Set<Dataset> getAccessedDatasets() { |
| return Collections.unmodifiableSet(txnAccessedDatasets); |
| } |
| } |