blob: ea1b9e675e3f4dbdb9c1a253802111f34fad3c05 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.metadata.declared;
import static org.apache.asterix.common.api.IIdentifierMapper.Modifier.PLURAL;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
import static org.apache.asterix.metadata.utils.MetadataConstants.METADATA_OBJECT_NAME_INVALID_CHARS;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
import org.apache.asterix.common.config.StorageProperties;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.external.IDataSourceAdapter;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.metadata.LockList;
import org.apache.asterix.common.storage.ICompressionManager;
import org.apache.asterix.common.transactions.ITxnIdFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import org.apache.asterix.external.adapter.factory.ExternalAdapterFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.policy.FeedPolicyAccessor;
import org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.api.ICCExtensionManager;
import org.apache.asterix.metadata.bootstrap.MetadataBuiltinEntities;
import org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetCardinalityHint;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.DatasourceAdapter;
import org.apache.asterix.metadata.entities.Datatype;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.FullTextConfigMetadataEntity;
import org.apache.asterix.metadata.entities.FullTextFilterMetadataEntity;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.Synonym;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
import org.apache.asterix.metadata.utils.DataPartitioningProvider;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.FullTextUtil;
import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionExtensionManager;
import org.apache.asterix.om.functions.IFunctionManager;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.NonTaggedFormatUtil;
import org.apache.asterix.runtime.base.AsterixTupleFilterFactory;
import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.common.utils.Triple;
import org.apache.hyracks.algebricks.core.algebra.base.Counter;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSink;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import org.apache.hyracks.algebricks.data.IAWriterFactory;
import org.apache.hyracks.algebricks.data.IPrinterFactory;
import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider;
import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider;
import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
import org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluatorFactory;
import org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.common.IStorageManager;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
public class MetadataProvider implements IMetadataProvider<DataSourceId, String> {
private final ICcApplicationContext appCtx;
private final IStorageComponentProvider storageComponentProvider;
private final StorageProperties storageProperties;
private final IFunctionManager functionManager;
private final LockList locks;
private final Map<String, Object> config;
private Dataverse defaultDataverse;
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
private FileSplit outputFile;
private boolean asyncResults;
private long maxResultReads;
private ResultSetId resultSetId;
private Counter resultSetIdCounter;
private TxnId txnId;
private boolean blockingOperatorDisabled = false;
private final DataPartitioningProvider dataPartitioningProvider;
public static MetadataProvider create(ICcApplicationContext appCtx, Dataverse defaultDataverse) {
java.util.function.Function<ICcApplicationContext, IMetadataProvider<?, ?>> factory =
((ICCExtensionManager) appCtx.getExtensionManager()).getMetadataProviderFactory();
MetadataProvider mp = factory != null ? (MetadataProvider) factory.apply(appCtx) : new MetadataProvider(appCtx);
mp.setDefaultDataverse(defaultDataverse);
return mp;
}
protected MetadataProvider(ICcApplicationContext appCtx) {
this.appCtx = appCtx;
this.storageComponentProvider = appCtx.getStorageComponentProvider();
storageProperties = appCtx.getStorageProperties();
functionManager = ((IFunctionExtensionManager) appCtx.getExtensionManager()).getFunctionManager();
dataPartitioningProvider = (DataPartitioningProvider) appCtx.getDataPartitioningProvider();
locks = new LockList();
config = new HashMap<>();
}
@SuppressWarnings("unchecked")
public <T> T getProperty(String name) {
return (T) config.get(name);
}
public void setProperty(String name, Object value) {
config.put(name, value);
}
@SuppressWarnings("unchecked")
public <T> T removeProperty(String name) {
return (T) config.remove(name);
}
public boolean getBooleanProperty(String name, boolean defaultValue) {
Object v = config.get(name);
return v != null ? Boolean.parseBoolean(String.valueOf(v)) : defaultValue;
}
public void disableBlockingOperator() {
blockingOperatorDisabled = true;
}
@Override
public boolean isBlockingOperatorDisabled() {
return blockingOperatorDisabled;
}
@Override
public Map<String, Object> getConfig() {
return config;
}
public void setTxnId(TxnId txnId) {
this.txnId = txnId;
}
public void setDefaultDataverse(Dataverse defaultDataverse) {
this.defaultDataverse = defaultDataverse == null ? MetadataBuiltinEntities.DEFAULT_DATAVERSE : defaultDataverse;
}
public Dataverse getDefaultDataverse() {
return defaultDataverse;
}
public DataverseName getDefaultDataverseName() {
return defaultDataverse.getDataverseName();
}
public void setWriteTransaction(boolean writeTransaction) {
this.isWriteTransaction = writeTransaction;
}
public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) {
this.mdTxnCtx = mdTxnCtx;
}
public MetadataTransactionContext getMetadataTxnContext() {
return mdTxnCtx;
}
public FileSplit getOutputFile() {
return outputFile;
}
public void setOutputFile(FileSplit outputFile) {
this.outputFile = outputFile;
}
public boolean getResultAsyncMode() {
return asyncResults;
}
public void setResultAsyncMode(boolean asyncResults) {
this.asyncResults = asyncResults;
}
public void setMaxResultReads(long maxResultReads) {
this.maxResultReads = maxResultReads;
}
public long getMaxResultReads() {
return maxResultReads;
}
public ResultSetId getResultSetId() {
return resultSetId;
}
public void setResultSetId(ResultSetId resultSetId) {
this.resultSetId = resultSetId;
}
public Counter getResultSetIdCounter() {
return resultSetIdCounter;
}
public void setResultSetIdCounter(Counter resultSetIdCounter) {
this.resultSetIdCounter = resultSetIdCounter;
}
public boolean isWriteTransaction() {
// The transaction writes persistent datasets.
return isWriteTransaction;
}
public IFunctionManager getFunctionManager() {
return functionManager;
}
public IDataFormat getDataFormat() {
return FormatUtils.getDefaultFormat();
}
public StorageProperties getStorageProperties() {
return storageProperties;
}
private DataverseName getActiveDataverseName(DataverseName dataverseName) {
return dataverseName != null ? dataverseName
: defaultDataverse != null ? defaultDataverse.getDataverseName() : null;
}
/**
* Retrieve the Output RecordType, as defined by "set output-record-type".
*/
public ARecordType findOutputRecordType() throws AlgebricksException {
return MetadataManagerUtil.findOutputRecordType(mdTxnCtx, getDefaultDataverseName(),
getProperty("output-record-type"));
}
public Dataset findDataset(DataverseName dataverseName, String datasetName) throws AlgebricksException {
return findDataset(dataverseName, datasetName, false);
}
public Dataset findDataset(DataverseName dataverseName, String datasetName, boolean includingViews)
throws AlgebricksException {
DataverseName dvName = getActiveDataverseName(dataverseName);
if (dvName == null) {
return null;
}
appCtx.getMetadataLockManager().acquireDataverseReadLock(locks, dvName);
appCtx.getMetadataLockManager().acquireDatasetReadLock(locks, dvName, datasetName);
return MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName, includingViews);
}
public INodeDomain findNodeDomain(String nodeGroupName) throws AlgebricksException {
return MetadataManagerUtil.findNodeDomain(appCtx.getClusterStateManager(), mdTxnCtx, nodeGroupName);
}
public List<String> findNodes(String nodeGroupName) throws AlgebricksException {
return MetadataManagerUtil.findNodes(mdTxnCtx, nodeGroupName);
}
public Datatype findTypeEntity(DataverseName dataverseName, String typeName) throws AlgebricksException {
return MetadataManagerUtil.findTypeEntity(mdTxnCtx, dataverseName, typeName);
}
public IAType findTypeForDatasetWithoutType(IAType recordType, IAType metaRecordType, Dataset dataset)
throws AlgebricksException {
return MetadataManagerUtil.findTypeForDatasetWithoutType(recordType, metaRecordType, dataset);
}
public IAType findType(DataverseName dataverseName, String typeName) throws AlgebricksException {
return MetadataManagerUtil.findType(mdTxnCtx, dataverseName, typeName);
}
public IAType findType(Dataset dataset) throws AlgebricksException {
return findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
}
public IAType findMetaType(Dataset dataset) throws AlgebricksException {
return findType(dataset.getMetaItemTypeDataverseName(), dataset.getMetaItemTypeName());
}
public Feed findFeed(DataverseName dataverseName, String feedName) throws AlgebricksException {
return MetadataManagerUtil.findFeed(mdTxnCtx, dataverseName, feedName);
}
public FeedConnection findFeedConnection(DataverseName dataverseName, String feedName, String datasetName)
throws AlgebricksException {
return MetadataManagerUtil.findFeedConnection(mdTxnCtx, dataverseName, feedName, datasetName);
}
public FeedPolicyEntity findFeedPolicy(DataverseName dataverseName, String policyName) throws AlgebricksException {
return MetadataManagerUtil.findFeedPolicy(mdTxnCtx, dataverseName, policyName);
}
@Override
public DataSource findDataSource(DataSourceId id) throws AlgebricksException {
return MetadataManagerUtil.findDataSource(appCtx.getClusterStateManager(), mdTxnCtx, id);
}
public DataSource lookupSourceInMetadata(DataSourceId aqlId) throws AlgebricksException {
return MetadataManagerUtil.lookupSourceInMetadata(appCtx.getClusterStateManager(), mdTxnCtx, aqlId);
}
@Override
public IDataSourceIndex<String, DataSourceId> findDataSourceIndex(String indexId, DataSourceId dataSourceId)
throws AlgebricksException {
DataSource source = findDataSource(dataSourceId);
Dataset dataset = ((DatasetDataSource) source).getDataset();
// index could be a primary index or secondary index
DataverseName dataverseName = dataset.getDataverseName();
String datasetName = dataset.getDatasetName();
Index index = getIndex(dataverseName, datasetName, indexId);
return index != null ? new DataSourceIndex(index, dataverseName, datasetName, this) : null;
}
public Index getIndex(DataverseName dataverseName, String datasetName, String indexName)
throws AlgebricksException {
return MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
}
public List<Index> getDatasetIndexes(DataverseName dataverseName, String datasetName) throws AlgebricksException {
return MetadataManagerUtil.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
}
public Index findSampleIndex(DataverseName dataverseName, String datasetName) throws AlgebricksException {
Pair<String, String> sampleIndexNames = IndexUtil.getSampleIndexNames(datasetName);
Index sampleIndex = getIndex(dataverseName, datasetName, sampleIndexNames.first);
if (sampleIndex != null && sampleIndex.getPendingOp() == MetadataUtil.PENDING_NO_OP) {
return sampleIndex;
}
sampleIndex = getIndex(dataverseName, datasetName, sampleIndexNames.second);
return sampleIndex != null && sampleIndex.getPendingOp() == MetadataUtil.PENDING_NO_OP ? sampleIndex : null;
}
public Triple<DataverseName, String, Boolean> resolveDatasetNameUsingSynonyms(DataverseName dataverseName,
String datasetName, boolean includingViews) throws AlgebricksException {
DataverseName dvName = getActiveDataverseName(dataverseName);
if (dvName == null) {
return null;
}
Synonym synonym = null;
while (MetadataManagerUtil.findDataset(mdTxnCtx, dvName, datasetName, includingViews) == null) {
synonym = findSynonym(dvName, datasetName);
if (synonym == null) {
return null;
}
dvName = synonym.getObjectDataverseName();
datasetName = synonym.getObjectName();
}
return new Triple<>(dvName, datasetName, synonym != null);
}
public Synonym findSynonym(DataverseName dataverseName, String synonymName) throws AlgebricksException {
return MetadataManagerUtil.findSynonym(mdTxnCtx, dataverseName, synonymName);
}
public FullTextConfigMetadataEntity findFullTextConfig(DataverseName dataverseName, String ftConfigName)
throws AlgebricksException {
return MetadataManagerUtil.findFullTextConfigDescriptor(mdTxnCtx, dataverseName, ftConfigName);
}
public FullTextFilterMetadataEntity findFullTextFilter(DataverseName dataverseName, String ftFilterName)
throws AlgebricksException {
return MetadataManagerUtil.findFullTextFilterDescriptor(mdTxnCtx, dataverseName, ftFilterName);
}
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return BuiltinFunctions.getBuiltinFunctionInfo(fid);
}
public Function lookupUserDefinedFunction(FunctionSignature signature) throws AlgebricksException {
if (signature.getDataverseName() == null) {
return null;
}
return MetadataManager.INSTANCE.getFunction(mdTxnCtx, signature);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<DataSourceId> dataSource, List<LogicalVariable> scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed, List<LogicalVariable> minFilterVars,
List<LogicalVariable> maxFilterVars, ITupleFilterFactory tupleFilterFactory, long outputLimit,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec,
Object implConfig, IProjectionFiltrationInfo<?> projectionInfo,
IProjectionFiltrationInfo<?> metaProjectionInfo) throws AlgebricksException {
return ((DataSource) dataSource).buildDatasourceScanRuntime(this, dataSource, scanVariables, projectVariables,
projectPushed, minFilterVars, maxFilterVars, tupleFilterFactory, outputLimit, opSchema, typeEnv,
context, jobSpec, implConfig, projectionInfo, metaProjectionInfo);
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getLoadableDatasetScanRuntime(
JobSpecification jobSpec, ITypedAdapterFactory adapterFactory, RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
try {
return new Pair<>(dataScanner, adapterFactory.getPartitionConstraint());
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
public Dataverse findDataverse(DataverseName dataverseName) throws AlgebricksException {
return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
}
public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint, ITypedAdapterFactory> getFeedIntakeRuntime(
JobSpecification jobSpec, Feed feed, FeedPolicyAccessor policyAccessor) throws Exception {
Triple<ITypedAdapterFactory, RecordDescriptor, IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput =
FeedMetadataUtil.getFeedFactoryAndOutput(feed, policyAccessor, mdTxnCtx, getApplicationContext());
ARecordType recordType =
FeedMetadataUtil.getOutputType(feed, feed.getConfiguration().get(ExternalDataConstants.KEY_TYPE_NAME));
ITypedAdapterFactory adapterFactory = factoryOutput.first;
FeedIntakeOperatorDescriptor feedIngestor = null;
switch (factoryOutput.third) {
case INTERNAL:
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, adapterFactory, recordType,
policyAccessor, factoryOutput.second);
break;
case EXTERNAL:
ExternalAdapterFactory extAdapterFactory = (ExternalAdapterFactory) adapterFactory;
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feed, extAdapterFactory.getLibraryDataverse(),
extAdapterFactory.getLibraryName(), extAdapterFactory.getClassName(), recordType,
policyAccessor, factoryOutput.second);
break;
default:
break;
}
AlgebricksPartitionConstraint partitionConstraint = adapterFactory.getPartitionConstraint();
return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBtreeSearchRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv, JobGenContext context, boolean retainInput,
boolean retainMissing, IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive,
boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, ITupleFilterFactory tupleFilterFactory, long outputLimit,
boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, ITupleProjectorFactory tupleProjectorFactory,
boolean partitionInputTuples) throws AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
if (primaryIndex != null && (dataset.getDatasetType() != DatasetType.EXTERNAL)) {
isSecondary = !indexName.equals(primaryIndex.getIndexName());
}
Index theIndex = isSecondary ? MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName) : primaryIndex;
int numSecondaryKeys;
switch (theIndex.getIndexType()) {
case ARRAY:
numSecondaryKeys = ((Index.ArrayIndexDetails) theIndex.getIndexDetails()).getElementList().stream()
.map(e -> e.getProjectList().size()).reduce(0, Integer::sum);
break;
case BTREE:
numSecondaryKeys = ((Index.ValueIndexDetails) theIndex.getIndexDetails()).getKeyFieldNames().size();
break;
case SAMPLE:
if (isIndexOnlyPlan) {
throw new CompilationException(ErrorCode.COMPILATION_ILLEGAL_STATE, "");
}
numSecondaryKeys = 0;
break;
default:
throw new CompilationException(ErrorCode.COMPILATION_UNKNOWN_INDEX_TYPE,
theIndex.getIndexType().toString());
}
int numPrimaryKeys = dataset.getPrimaryKeys().size();
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
PartitioningProperties datasetPartitioningProp = getPartitioningProperties(dataset, theIndex.getIndexName());
int[] primaryKeyFields = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFields[i] = i;
}
int[] primaryKeyFieldsInSecondaryIndex = null;
byte[] successValueForIndexOnlyPlan = null;
byte[] failValueForIndexOnlyPlan = null;
boolean proceedIndexOnlyPlan = isIndexOnlyPlan && isSecondary;
if (proceedIndexOnlyPlan) {
primaryKeyFieldsInSecondaryIndex = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFieldsInSecondaryIndex[i] = i + numSecondaryKeys;
}
// Defines the return value from a secondary index search if this is an index-only plan.
failValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(0);
successValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(1);
}
ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(storageComponentProvider, theIndex, IndexOperation.SEARCH,
primaryKeyFields, primaryKeyFieldsInSecondaryIndex, proceedIndexOnlyPlan);
IStorageManager storageManager = getStorageComponentProvider().getStorageManager();
IIndexDataflowHelperFactory indexHelperFactory =
new IndexDataflowHelperFactory(storageManager, datasetPartitioningProp.getSplitsProvider());
BTreeSearchOperatorDescriptor btreeSearchOp;
int[][] partitionsMap = datasetPartitioningProp.getComputeStorageMap();
ITuplePartitionerFactory tuplePartitionerFactory = null;
if (partitionInputTuples) {
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
tuplePartitionerFactory = new FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories,
datasetPartitioningProp.getNumberOfPartitions());
}
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
btreeSearchOp = !isSecondary && isPrimaryIndexPointSearch
? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput,
retainMissing, nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory, outputLimit, tupleProjectorFactory,
tuplePartitionerFactory, partitionsMap)
: new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, indexHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory, searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes,
propagateFilter, nonFilterWriterFactory, tupleFilterFactory, outputLimit,
proceedIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
} else {
btreeSearchOp = null;
}
return new Pair<>(btreeSearchOp, datasetPartitioningProp.getConstraints());
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRtreeSearchRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing,
IMissingWriterFactory nonMatchWriterFactory, Dataset dataset, String indexName, int[] keyFields,
boolean propagateFilter, IMissingWriterFactory nonFilterWriterFactory, int[] minFilterFieldIndexes,
int[] maxFilterFieldIndexes, boolean isIndexOnlyPlan) throws AlgebricksException {
int numPrimaryKeys = dataset.getPrimaryKeys().size();
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for " + dataset() + " "
+ dataset.getDatasetName());
}
Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails();
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
int[] primaryKeyFields = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFields[i] = i;
}
int[] primaryKeyFieldsInSecondaryIndex = null;
byte[] successValueForIndexOnlyPlan = null;
byte[] failValueForIndexOnlyPlan = null;
if (isIndexOnlyPlan) {
ARecordType recType = (ARecordType) findType(dataset.getItemTypeDataverseName(), dataset.getItemTypeName());
List<List<String>> secondaryKeyFields = secondaryIndexDetails.getKeyFieldNames();
List<IAType> secondaryKeyTypes = secondaryIndexDetails.getKeyFieldTypes();
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableOpenFieldType(secondaryIndex,
secondaryKeyTypes.get(0), secondaryKeyFields.get(0), recType);
IAType keyType = keyTypePair.first;
int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
int numNestedSecondaryKeyFields = numDimensions * 2;
primaryKeyFieldsInSecondaryIndex = new int[numPrimaryKeys];
for (int i = 0; i < numPrimaryKeys; i++) {
primaryKeyFieldsInSecondaryIndex[i] = i + numNestedSecondaryKeyFields;
}
// Defines the return value from a secondary index search if this is an index-only plan.
failValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(0);
successValueForIndexOnlyPlan = SerializerDeserializerUtil.computeByteArrayForIntValue(1);
}
ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(storageComponentProvider, secondaryIndex, IndexOperation.SEARCH,
primaryKeyFields, primaryKeyFieldsInSecondaryIndex, isIndexOnlyPlan);
RTreeSearchOperatorDescriptor rtreeSearchOp;
IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSplitsProvider());
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
int[][] partitionsMap = partitioningProperties.getComputeStorageMap();
rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, keyFields, true, true,
indexDataflowHelperFactory, retainInput, retainMissing, nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes, maxFilterFieldIndexes, propagateFilter,
nonFilterWriterFactory, isIndexOnlyPlan, failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
partitionsMap);
} else {
// Create the operator
rtreeSearchOp = null;
}
return new Pair<>(rtreeSearchOp, partitioningProperties.getConstraints());
}
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
RecordDescriptor inputDesc) {
FileSplitDataSink fsds = (FileSplitDataSink) sink;
FileSplitSinkId fssi = fsds.getId();
FileSplit fs = fssi.getFileSplit();
File outFile = new File(fs.getPath());
String nodeId = fs.getNodeName();
SinkWriterRuntimeFactory runtime =
new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, writerFactory, inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
return new Pair<>(runtime, apc);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory,
IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
ResultSetSinkId rssId = rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
ResultWriterOperatorDescriptor resultWriter = null;
try {
IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider
.getResultSerializerFactoryProvider(printColumns, printerFactories, writerFactory);
resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(),
resultSerializedAppenderFactory, getMaxResultReads());
} catch (IOException e) {
throw new AlgebricksException(e);
}
return new Pair<>(resultWriter, null);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification spec, boolean bulkload) throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOperation.INSERT, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, inputRecordDesc, context, spec, bulkload, additionalNonFilteringFields);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys, LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields,
List<LogicalVariable> additionalNonFilteringFields, RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOperation.DELETE, dataSource, propagatedSchema, keys, payload,
additionalNonKeyFields, inputRecordDesc, context, spec, false, additionalNonFilteringFields);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getUpsertRuntime(
IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys, LogicalVariable payload, List<LogicalVariable> filterKeys,
List<LogicalVariable> additionalNonFilterFields, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec) throws AlgebricksException {
DataverseName dataverseName = dataSource.getId().getDataverseName();
String datasetName = dataSource.getId().getDatasourceName();
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
}
int numKeys = primaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
int numOfAdditionalFields = additionalNonFilterFields == null ? 0 : additionalNonFilterFields.size();
// Move key fields to front. [keys, record, filters]
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields + numOfAdditionalFields];
int[] bloomFilterKeyFields = new int[numKeys];
int i = 0;
// set the keys' permutations
for (LogicalVariable varKey : primaryKeys) {
int idx = inputSchema.findVariable(varKey);
fieldPermutation[i] = idx;
bloomFilterKeyFields[i] = i;
i++;
}
// set the record permutation
fieldPermutation[i++] = inputSchema.findVariable(payload);
// set the meta record permutation
if (additionalNonFilterFields != null) {
for (LogicalVariable var : additionalNonFilterFields) {
int idx = inputSchema.findVariable(var);
fieldPermutation[i++] = idx;
}
}
// set the filters' permutations.
if (numFilterFields > 0) {
int idx = inputSchema.findVariable(filterKeys.get(0));
fieldPermutation[i++] = idx;
}
return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc, fieldPermutation,
context.getMissingWriterFactory());
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
boolean bulkload, List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
return getIndexModificationRuntime(IndexOperation.INSERT, dataSourceIndex, propagatedSchema, inputSchemas,
typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
spec, bulkload, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
return getIndexModificationRuntime(IndexOperation.DELETE, dataSourceIndex, propagatedSchema, inputSchemas,
typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields, filterExpr, null, recordDesc, context,
spec, false, null, null, null, secondaryKeysPipelines, pipelineTopSchema);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexUpsertRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalFilteringKeys,
ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws AlgebricksException {
return getIndexModificationRuntime(IndexOperation.UPSERT, dataSourceIndex, propagatedSchema, inputSchemas,
typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys, filterExpr, prevFilterExpr, recordDesc,
context, spec, false, operationVar, prevSecondaryKeys, prevAdditionalFilteringKey,
secondaryKeysPipelines, null);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getTokenizerRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
DataverseName dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
IOperatorSchema inputSchema;
if (inputSchemas.length > 0) {
inputSchema = inputSchemas[0];
} else {
throw new AlgebricksException("TokenizeOperator can not operate without any input variable.");
}
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
// TokenizeOperator only supports a keyword or n-gram index.
switch (secondaryIndex.getIndexType()) {
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
return getBinaryTokenizerRuntime(dataverseName, datasetName, indexName, inputSchema, propagatedSchema,
primaryKeys, secondaryKeys, recordDesc, spec, secondaryIndex.getIndexType());
default:
throw new AlgebricksException("Currently, we do not support TokenizeOperator for the index type: "
+ secondaryIndex.getIndexType());
}
}
/**
* Calculate an estimate size of the bloom filter. Note that this is an
* estimation which assumes that the data is going to be uniformly distributed
* across all partitions.
*
* @param dataset
* @return Number of elements that will be used to create a bloom filter per
* dataset per partition
* @throws AlgebricksException
*/
public long getCardinalityPerPartitionHint(Dataset dataset) throws AlgebricksException {
String numElementsHintString = dataset.getHints().get(DatasetCardinalityHint.NAME);
long numElementsHint;
if (numElementsHintString == null) {
numElementsHint = DatasetCardinalityHint.DEFAULT;
} else {
numElementsHint = Long.parseLong(numElementsHintString);
}
int numPartitions = getPartitioningProperties(dataset).getNumberOfPartitions();
return numElementsHint / numPartitions;
}
protected ITypedAdapterFactory getConfiguredAdapterFactory(Dataset dataset, String adapterName,
Map<String, String> configuration, ARecordType itemType, ARecordType metaType,
IWarningCollector warningCollector) throws AlgebricksException {
try {
configuration.put(ExternalDataConstants.KEY_DATASET_DATAVERSE,
dataset.getDataverseName().getCanonicalForm());
return AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(), adapterName,
configuration, itemType, metaType, warningCollector);
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter", e);
}
}
public TxnId getTxnId() {
return txnId;
}
public static ILinearizeComparatorFactory proposeLinearizer(ATypeTag keyType, int numKeyFields)
throws AlgebricksException {
return LinearizeComparatorFactoryProvider.INSTANCE.getLinearizeComparatorFactory(keyType, true,
numKeyFields / 2);
}
public PartitioningProperties splitAndConstraints(DataverseName dataverseName) {
return dataPartitioningProvider.getPartitioningProperties(dataverseName);
}
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, Dataset dataset, String indexName)
throws AlgebricksException {
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, dataset, indexName).getSplitsProvider()
.getFileSplits();
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, DataverseName dataverseName,
String adapterName) throws AlgebricksException {
DatasourceAdapter adapter;
// search in default namespace (built-in adapter)
adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// search in dataverse (user-defined adapter)
if (adapter == null) {
adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
}
return adapter;
}
public AlgebricksAbsolutePartitionConstraint getClusterLocations() {
return appCtx.getClusterStateManager().getClusterLocations();
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataLookupRuntime(
JobSpecification jobSpec, Dataset dataset, int[] ridIndexes, boolean retainInput,
IVariableTypeEnvironment typeEnv, IOperatorSchema opSchema, JobGenContext context,
MetadataProvider metadataProvider, boolean retainMissing) throws AlgebricksException {
return null;
}
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(JobSpecification spec,
MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
// this can be used by extensions to pick up their own operators
return DatasetUtil.createPrimaryIndexUpsertOp(spec, this, dataset, inputRecordDesc, fieldPermutation,
missingWriterFactory);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getExternalDatasetScanRuntime(
JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory adapterFactory,
ITupleFilterFactory tupleFilterFactory, long outputLimit) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Can only scan " + dataset(PLURAL) + "of records.");
}
ISerializerDeserializer<?> payloadSerde =
getDataFormat().getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalScanOperatorDescriptor dataScanner = new ExternalScanOperatorDescriptor(jobSpec, scannerDesc,
adapterFactory, tupleFilterFactory, outputLimit);
//TODO(partitioning) check
AlgebricksPartitionConstraint constraint;
try {
constraint = adapterFactory.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
return new Pair<>(dataScanner, constraint);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOperation indexOp,
IDataSource<DataSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, List<LogicalVariable> additionalNonKeyFields, RecordDescriptor inputRecordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload,
List<LogicalVariable> additionalNonFilteringFields) throws AlgebricksException {
String datasetName = dataSource.getId().getDatasourceName();
Dataset dataset =
MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataSource.getId().getDataverseName(), datasetName);
int numKeys = keys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
// Move key fields to front.
int[] fieldPermutation = new int[numKeys + 1 + numFilterFields
+ (additionalNonFilteringFields == null ? 0 : additionalNonFilteringFields.size())];
int[] bloomFilterKeyFields = new int[numKeys];
int[] pkFields = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
pkFields[i] = idx;
fieldPermutation[i] = idx;
bloomFilterKeyFields[i] = i;
i++;
}
fieldPermutation[i++] = propagatedSchema.findVariable(payload);
if (additionalNonFilteringFields != null) {
for (LogicalVariable variable : additionalNonFilteringFields) {
int idx = propagatedSchema.findVariable(variable);
fieldPermutation[i++] = idx;
}
}
int[] filterFields = new int[numFilterFields];
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[i++] = idx;
filterFields[0] = idx;
}
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
PartitioningProperties partitioningProperties = getPartitioningProperties(dataset);
// prepare callback
int[] primaryKeyFields = new int[numKeys];
for (i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
}
IModificationOperationCallbackFactory modificationCallbackFactory = dataset
.getModificationCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
partitioningProperties.getSplitsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
//TODO(partitioning) rename to static
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, true, numElementsHint, true, idfh, null,
BulkLoadUsage.LOAD, dataset.getDatasetId(), null, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
if (indexOp == IndexOperation.INSERT) {
ISearchOperationCallbackFactory searchCallbackFactory = dataset
.getSearchCallbackFactory(storageComponentProvider, primaryIndex, indexOp, primaryKeyFields);
Optional<Index> primaryKeyIndex = MetadataManager.INSTANCE
.getDatasetIndexes(mdTxnCtx, dataset.getDataverseName(), dataset.getDatasetName()).stream()
.filter(Index::isPrimaryKeyIndex).findFirst();
IIndexDataflowHelperFactory pkidfh = null;
if (primaryKeyIndex.isPresent()) {
PartitioningProperties idxPartitioningProperties =
getPartitioningProperties(dataset, primaryKeyIndex.get().getIndexName());
pkidfh = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
idxPartitioningProperties.getSplitsProvider());
}
op = createLSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
op = createLSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
null, true, modificationCallbackFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
}
}
return new Pair<>(op, partitioningProperties.getConstraints());
}
protected LSMPrimaryInsertOperatorDescriptor createLSMPrimaryInsertOperatorDescriptor(JobSpecification spec,
RecordDescriptor inputRecordDesc, int[] fieldPermutation, IIndexDataflowHelperFactory idfh,
IIndexDataflowHelperFactory pkidfh, IModificationOperationCallbackFactory modificationCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numKeys, int[] filterFields,
ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) {
// this can be used by extensions to pick up their own operators
return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys, filterFields, tuplePartitionerFactory,
partitionsMap);
}
protected LSMTreeInsertDeleteOperatorDescriptor createLSMTreeInsertDeleteOperatorDescriptor(
IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, int[] fieldPermutation, IndexOperation op,
IIndexDataflowHelperFactory indexHelperFactory, ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
IModificationOperationCallbackFactory modCallbackFactory, ITuplePartitionerFactory tuplePartitionerFactory,
int[][] partitionsMap) {
return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
tupleFilterFactory, isPrimary, modCallbackFactory, tuplePartitionerFactory, partitionsMap);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexModificationRuntime(IndexOperation indexOp,
IDataSourceIndex<String, DataSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, List<LogicalVariable> additionalNonKeyFields,
ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr, RecordDescriptor inputRecordDesc,
JobGenContext context, JobSpecification spec, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable prevAdditionalFilteringKey,
List<List<AlgebricksPipeline>> secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
String indexName = dataSourceIndex.getId();
DataverseName dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasourceName();
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
ArrayList<LogicalVariable> prevAdditionalFilteringKeys = null;
if (indexOp == IndexOperation.UPSERT && prevAdditionalFilteringKey != null) {
prevAdditionalFilteringKeys = new ArrayList<>();
prevAdditionalFilteringKeys.add(prevAdditionalFilteringKey);
}
// If we have a pipeline, then we need to pass the schema of the pipeline to the filter factory.
AsterixTupleFilterFactory filterFactory;
AsterixTupleFilterFactory prevFilterFactory;
if (pipelineTopSchema != null) {
IOperatorSchema[] schemasForFilterFactory = new IOperatorSchema[inputSchemas.length + 1];
System.arraycopy(inputSchemas, 0, schemasForFilterFactory, 0, inputSchemas.length);
schemasForFilterFactory[inputSchemas.length] = pipelineTopSchema;
filterFactory = createTupleFilterFactory(schemasForFilterFactory, typeEnv, filterExpr, context);
prevFilterFactory = createTupleFilterFactory(schemasForFilterFactory, typeEnv, prevFilterExpr, context);
} else {
filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
prevFilterFactory = createTupleFilterFactory(inputSchemas, typeEnv, prevFilterExpr, context);
}
switch (secondaryIndex.getIndexType()) {
case BTREE:
return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case ARRAY:
if (bulkload) {
// In the case of bulk-load, we do not handle any nested plans. We perform the exact same behavior
// as a normal B-Tree bulk load.
return getBTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
inputRecordDesc, context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
} else {
return getArrayIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
primaryKeys, additionalNonKeyFields, inputRecordDesc, spec, indexOp, operationVar,
secondaryKeysPipelines);
}
case RTREE:
return getRTreeModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar, prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
return getInvertedIndexModificationRuntime(dataverseName, datasetName, indexName, propagatedSchema,
primaryKeys, secondaryKeys, additionalNonKeyFields, filterFactory, prevFilterFactory,
inputRecordDesc, context, spec, indexOp, secondaryIndex.getIndexType(), bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
indexOp.name() + " not implemented for index type: " + secondaryIndex.getIndexType());
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
// generate field permutations
int[] fieldPermutation = new int[numKeys + numFilterFields];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int[] pkFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
pkFields[j] = idx;
modificationCallbackPrimaryKeyFields[j] = i;
i++;
j++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[numKeys] = idx;
}
int[] prevFieldPermutation = null;
if (indexOp == IndexOperation.UPSERT) {
// generate field permutations for prev record
prevFieldPermutation = new int[numKeys + numFilterFields];
int k = 0;
for (LogicalVariable varKey : prevSecondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
prevFieldPermutation[k] = idx;
k++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
prevFieldPermutation[k] = idx;
k++;
}
// Filter can only be one field!
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
prevFieldPermutation[numKeys] = idx;
}
}
try {
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSplitsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, idfh, null,
BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, idfh,
filterFactory, prevFilterFactory, modificationCallbackFactory, operationFieldIndex,
BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, inputRecordDesc, fieldPermutation, indexOp, idfh,
filterFactory, false, modificationCallbackFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
}
return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getArrayIndexModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable> additionalNonKeyFields,
RecordDescriptor inputRecordDesc, JobSpecification spec, IndexOperation indexOp,
LogicalVariable operationVar, List<List<AlgebricksPipeline>> secondaryKeysPipelines)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
int numPrimaryKeys = primaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
// Generate field permutations (this only includes primary keys and filter fields).
int[] fieldPermutation = new int[numPrimaryKeys + numFilterFields];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int[] pkFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
pkFields[j] = idx;
modificationCallbackPrimaryKeyFields[j] = i;
i++;
j++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[numPrimaryKeys] = idx;
}
try {
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// Prepare callback.
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory idfh = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSplitsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory(pkFields,
pkHashFunFactories, partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertWithNestedPlanOperatorDescriptor(spec, inputRecordDesc, fieldPermutation,
idfh, modificationCallbackFactory, operationFieldIndex, BinaryIntegerInspector.FACTORY,
secondaryKeysPipelines.get(0), secondaryKeysPipelines.get(1), tuplePartitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
op = new LSMSecondaryInsertDeleteWithNestedPlanOperatorDescriptor(spec, inputRecordDesc,
fieldPermutation, indexOp, idfh, modificationCallbackFactory, secondaryKeysPipelines.get(0),
tuplePartitionerFactory, partitioningProperties.getComputeStorageMap());
}
return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable> prevAdditionalFilteringKeys)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE
.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName).getDatatype();
validateRecordType(itemType);
ARecordType recType = (ARecordType) itemType;
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails();
List<List<String>> secondaryKeyExprs = secondaryIndexDetails.getKeyFieldNames();
List<IAType> secondaryKeyTypes = secondaryIndexDetails.getKeyFieldTypes();
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryIndex, secondaryKeyTypes.get(0),
secondaryKeyExprs.get(0), recType);
IAType spatialType = keyPairType.first;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numSecondaryKeys = dimension * 2;
int numPrimaryKeys = primaryKeys.size();
int numKeys = numSecondaryKeys + numPrimaryKeys;
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
int[] fieldPermutation = new int[numKeys + numFilterFields];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int[] pkFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
pkFields[j] = idx;
modificationCallbackPrimaryKeyFields[j] = i;
i++;
j++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[numKeys] = idx;
}
int[] prevFieldPermutation = null;
if (indexOp == IndexOperation.UPSERT) {
// Get field permutation for previous value
prevFieldPermutation = new int[numKeys + numFilterFields];
i = 0;
// Get field permutation for new value
for (LogicalVariable varKey : prevSecondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
prevFieldPermutation[i] = idx;
i++;
}
for (int k = 0; k < numPrimaryKeys; k++) {
prevFieldPermutation[k + i] = fieldPermutation[k + i];
i++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
prevFieldPermutation[numKeys] = idx;
}
}
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSplitsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false,
indexDataflowHelperFactory, null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory,
partitionerFactory, partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int operationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation,
indexDataflowHelperFactory, filterFactory, prevFilterFactory, modificationCallbackFactory,
operationFieldIndex, BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataflowHelperFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
}
return new Pair<>(op, partitioningProperties.getConstraints());
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInvertedIndexModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
List<LogicalVariable> additionalNonKeyFields, AsterixTupleFilterFactory filterFactory,
AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp, IndexType indexType, boolean bulkload,
LogicalVariable operationVar, List<LogicalVariable> prevSecondaryKeys,
List<LogicalVariable> prevAdditionalFilteringKeys) throws AlgebricksException {
// Check the index is length-partitioned or not.
boolean isPartitioned;
isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
// Sanity checks.
if (primaryKeys.size() > 1) {
throw new AlgebricksException(
"Cannot create inverted index on " + dataset(PLURAL) + " with composite primary key.");
}
// The size of secondaryKeys can be two if it receives input from its
// TokenizeOperator- [token, number of token]
if ((secondaryKeys.size() > 1 && !isPartitioned) || (secondaryKeys.size() > 2 && isPartitioned)) {
throw new AlgebricksException("Cannot create composite inverted index on multiple fields.");
}
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
// For tokenization, sorting and loading.
// One token (+ optional partitioning field) + primary keys: [token,
// number of token, PK]
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
// generate field permutations
int[] fieldPermutation = new int[numKeys + numFilterFields];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int[] pkFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
// If the index is partitioned: [token, number of token]
// Otherwise: [token]
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
pkFields[j] = idx;
modificationCallbackPrimaryKeyFields[j] = i;
i++;
j++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(additionalNonKeyFields.get(0));
fieldPermutation[numKeys] = idx;
}
int[] prevFieldPermutation = null;
if (indexOp == IndexOperation.UPSERT) {
// Find permutations for prev value
prevFieldPermutation = new int[numKeys + numFilterFields];
i = 0;
// If the index is partitioned: [token, number of token]
// Otherwise: [token]
for (LogicalVariable varKey : prevSecondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
prevFieldPermutation[i] = idx;
i++;
}
for (int k = 0; k < primaryKeys.size(); k++) {
prevFieldPermutation[k + i] = fieldPermutation[k + i];
i++;
}
if (numFilterFields > 0) {
int idx = propagatedSchema.findVariable(prevAdditionalFilteringKeys.get(0));
prevFieldPermutation[numKeys] = idx;
}
}
try {
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// prepare callback
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, secondaryIndex, indexOp, modificationCallbackPrimaryKeyFields);
IIndexDataflowHelperFactory indexDataFlowFactory = new IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(), partitioningProperties.getSplitsProvider());
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(this);
ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
partitioningProperties.getNumberOfPartitions());
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
op = new LSMIndexBulkLoadOperatorDescriptor(spec, recordDesc, fieldPermutation,
StorageConstants.DEFAULT_TREE_FILL_FACTOR, false, numElementsHint, false, indexDataFlowFactory,
null, BulkLoadUsage.LOAD, dataset.getDatasetId(), filterFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else if (indexOp == IndexOperation.UPSERT) {
int upsertOperationFieldIndex = propagatedSchema.findVariable(operationVar);
op = new LSMSecondaryUpsertOperatorDescriptor(spec, recordDesc, fieldPermutation, indexDataFlowFactory,
filterFactory, prevFilterFactory, modificationCallbackFactory, upsertOperationFieldIndex,
BinaryIntegerInspector.FACTORY, prevFieldPermutation, partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
op = new LSMTreeInsertDeleteOperatorDescriptor(spec, recordDesc, fieldPermutation, indexOp,
indexDataFlowFactory, filterFactory, false, modificationCallbackFactory, partitionerFactory,
partitioningProperties.getComputeStorageMap());
}
return new Pair<>(op, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
// Get a Tokenizer for the bulk-loading data into a n-gram or keyword index.
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBinaryTokenizerRuntime(
DataverseName dataverseName, String datasetName, String indexName, IOperatorSchema inputSchema,
IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys, List<LogicalVariable> secondaryKeys,
RecordDescriptor recordDesc, JobSpecification spec, IndexType indexType) throws AlgebricksException {
// Sanity checks.
if (primaryKeys.size() > 1) {
throw new AlgebricksException("Cannot tokenize composite primary key.");
}
if (secondaryKeys.size() > 1) {
throw new AlgebricksException("Cannot tokenize composite secondary key fields.");
}
boolean isPartitioned;
isPartitioned = indexType == IndexType.LENGTH_PARTITIONED_WORD_INVIX
|| indexType == IndexType.LENGTH_PARTITIONED_NGRAM_INVIX;
// Number of Keys that needs to be propagated
int numKeys = inputSchema.getSize();
// Get the rest of Logical Variables that are not (PK or SK) and each
// variable's positions.
// These variables will be propagated through TokenizeOperator.
List<LogicalVariable> otherKeys = new ArrayList<>();
if (inputSchema.getSize() > 0) {
for (int k = 0; k < inputSchema.getSize(); k++) {
boolean found = false;
for (LogicalVariable varKey : primaryKeys) {
if (varKey.equals(inputSchema.getVariable(k))) {
found = true;
break;
} else {
found = false;
}
}
if (!found) {
for (LogicalVariable varKey : secondaryKeys) {
if (varKey.equals(inputSchema.getVariable(k))) {
found = true;
break;
} else {
found = false;
}
}
}
if (!found) {
otherKeys.add(inputSchema.getVariable(k));
}
}
}
// For tokenization, sorting and loading.
// One token (+ optional partitioning field) + primary keys + secondary
// keys + other variables
// secondary keys and other variables will be just passed to the
// IndexInsertDelete Operator.
int numTokenKeyPairFields = (!isPartitioned) ? 1 + numKeys : 2 + numKeys;
// generate field permutations for the input
int[] fieldPermutation = new int[numKeys];
int[] modificationCallbackPrimaryKeyFields = new int[primaryKeys.size()];
int i = 0;
int j = 0;
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
modificationCallbackPrimaryKeyFields[j] = i;
i++;
j++;
}
for (LogicalVariable varKey : otherKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType;
try {
itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getItemTypeDataverseName(), itemTypeName)
.getDatatype();
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Only record types can be tokenized.");
}
ARecordType recType = (ARecordType) itemType;
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
Index.TextIndexDetails secondaryIndexDetails = (Index.TextIndexDetails) secondaryIndex.getIndexDetails();
List<List<String>> secondaryKeyExprs = secondaryIndexDetails.getKeyFieldNames();
List<IAType> secondaryKeyTypeEntries = secondaryIndexDetails.getKeyFieldTypes();
int numTokenFields = (!isPartitioned) ? secondaryKeys.size() : secondaryKeys.size() + 1;
ITypeTraits[] tokenTypeTraits = new ITypeTraits[numTokenFields];
ITypeTraits[] invListsTypeTraits = new ITypeTraits[primaryKeys.size()];
// Find the key type of the secondary key. If it's a derived type,
// return the derived type.
// e.g. UNORDERED LIST -> return UNORDERED LIST type
IAType secondaryKeyType;
Pair<IAType, Boolean> keyPairType = Index.getNonNullableOpenFieldType(secondaryIndex,
secondaryKeyTypeEntries.get(0), secondaryKeyExprs.get(0), recType);
secondaryKeyType = keyPairType.first;
List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
i = 0;
for (List<String> partitioningKey : partitioningKeys) {
IAType keyType = recType.getSubFieldType(partitioningKey);
invListsTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
tokenTypeTraits[0] = NonTaggedFormatUtil.getTokenTypeTrait(secondaryKeyType);
if (isPartitioned) {
// The partitioning field is hardcoded to be a short *without*
// an Asterix type tag.
tokenTypeTraits[1] = ShortPointable.TYPE_TRAITS;
}
IBinaryTokenizerFactory tokenizerFactory = NonTaggedFormatUtil.getBinaryTokenizerFactory(
secondaryKeyType.getTypeTag(), indexType, secondaryIndexDetails.getGramLength());
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory =
FullTextUtil.fetchFilterAndCreateConfigEvaluator(this, secondaryIndex.getDataverseName(),
secondaryIndexDetails.getFullTextConfigName());
PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset, secondaryIndex.getIndexName());
// Generate Output Record format
ISerializerDeserializer<?>[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
ITypeTraits[] tokenKeyPairTypeTraits = new ITypeTraits[numTokenKeyPairFields];
ISerializerDeserializerProvider serdeProvider = getDataFormat().getSerdeProvider();
// The order of the output record: propagated variables (including
// PK and SK), token, and number of token.
// #1. propagate all input variables
for (int k = 0; k < recordDesc.getFieldCount(); k++) {
tokenKeyPairFields[k] = recordDesc.getFields()[k];
tokenKeyPairTypeTraits[k] = recordDesc.getTypeTraits()[k];
}
int tokenOffset = recordDesc.getFieldCount();
// #2. Specify the token type
tokenKeyPairFields[tokenOffset] = serdeProvider.getSerializerDeserializer(secondaryKeyType);
tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[0];
tokenOffset++;
// #3. Specify the length-partitioning key: number of token
if (isPartitioned) {
tokenKeyPairFields[tokenOffset] = ShortSerializerDeserializer.INSTANCE;
tokenKeyPairTypeTraits[tokenOffset] = tokenTypeTraits[1];
}
RecordDescriptor tokenKeyPairRecDesc = new RecordDescriptor(tokenKeyPairFields, tokenKeyPairTypeTraits);
IOperatorDescriptor tokenizerOp;
// Keys to be tokenized : SK
int docField = fieldPermutation[fieldPermutation.length - 1];
// Keys to be propagated
int[] keyFields = new int[numKeys];
for (int k = 0; k < keyFields.length; k++) {
keyFields[k] = k;
}
//TODO(partitioning) check
tokenizerOp = new BinaryTokenizerOperatorDescriptor(spec, tokenKeyPairRecDesc, tokenizerFactory,
fullTextConfigEvaluatorFactory, docField, keyFields, isPartitioned, true, false,
MissingWriterFactory.INSTANCE);
return new Pair<>(tokenizerOp, partitioningProperties.getConstraints());
} catch (Exception e) {
throw new AlgebricksException(e);
}
}
@Override
public AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
}
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
IScalarEvaluatorFactory filterEvalFactory =
expressionRuntimeProvider.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas, context);
return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
}
private void validateRecordType(IAType itemType) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
throw new AlgebricksException("Only record types can be indexed.");
}
}
public IStorageComponentProvider getStorageComponentProvider() {
return storageComponentProvider;
}
public PartitioningProperties getPartitioningProperties(Index idx) throws AlgebricksException {
Dataset ds = findDataset(idx.getDataverseName(), idx.getDatasetName());
return getPartitioningProperties(ds, idx.getIndexName());
}
public PartitioningProperties getPartitioningProperties(Dataset ds) throws AlgebricksException {
return getPartitioningProperties(ds, ds.getDatasetName());
}
public PartitioningProperties getPartitioningProperties(Dataset ds, String indexName) throws AlgebricksException {
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, ds, indexName);
}
public PartitioningProperties getPartitioningProperties(Feed feed) throws AlgebricksException {
return dataPartitioningProvider.getPartitioningProperties(feed);
}
public List<Index> getSecondaryIndexes(Dataset ds) throws AlgebricksException {
return getDatasetIndexes(ds.getDataverseName(), ds.getDatasetName()).stream()
.filter(idx -> idx.isSecondaryIndex() && !idx.isSampleIndex()).collect(Collectors.toList());
}
public LockList getLocks() {
return locks;
}
public ICcApplicationContext getApplicationContext() {
return appCtx;
}
public ITxnIdFactory getTxnIdFactory() {
return appCtx.getTxnIdFactory();
}
public ICompressionManager getCompressionManager() {
return appCtx.getCompressionManager();
}
public void validateDataverseName(DataverseName dataverseName, SourceLocation sourceLoc)
throws AlgebricksException {
int totalLengthUTF8 = 0;
for (String dvNamePart : dataverseName.getParts()) {
validateDatabaseObjectNameImpl(dvNamePart, sourceLoc);
if (totalLengthUTF8 == 0 && StoragePathUtil.DATAVERSE_CONTINUATION_MARKER == dvNamePart.codePointAt(0)) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, dvNamePart);
}
totalLengthUTF8 += dvNamePart.getBytes(StandardCharsets.UTF_8).length;
}
if (totalLengthUTF8 > MetadataConstants.DATAVERSE_NAME_TOTAL_LENGTH_LIMIT_UTF8) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, dataverseName.toString());
}
}
public void validateDatabaseObjectName(DataverseName dataverseName, String objectName, SourceLocation sourceLoc)
throws AlgebricksException {
if (dataverseName != null) {
validateDataverseName(dataverseName, sourceLoc);
}
validateDatabaseObjectNameImpl(objectName, sourceLoc);
}
private void validateDatabaseObjectNameImpl(String name, SourceLocation sourceLoc) throws AlgebricksException {
if (name == null || name.isEmpty()) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, "");
}
if (Character.isWhitespace(name.codePointAt(0)) || METADATA_OBJECT_NAME_INVALID_CHARS.matcher(name).find()) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, name);
}
int lengthUTF8 = name.getBytes(StandardCharsets.UTF_8).length;
if (lengthUTF8 > MetadataConstants.METADATA_OBJECT_NAME_LENGTH_LIMIT_UTF8) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME, sourceLoc, name);
}
}
}