blob: 321a73c74ebafe00643eb6e29fc327419ed78ed1 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.external.adapter.factory.IAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.IGenericDatasetAdapterFactory;
import edu.uci.ics.asterix.external.adapter.factory.ITypedDatasetAdapterFactory;
import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.external.data.operator.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.external.data.operator.FeedMessageOperatorDescriptor;
import edu.uci.ics.asterix.external.dataset.adapter.IDatasourceAdapter;
import edu.uci.ics.asterix.external.dataset.adapter.ITypedDatasourceAdapter;
import edu.uci.ics.asterix.external.feed.lifecycle.FeedId;
import edu.uci.ics.asterix.external.feed.lifecycle.IFeedMessage;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
private final MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
private Map<String, String[]> stores;
private Map<String, String> config;
private IAWriterFactory writerFactory;
private FileSplit outputFile;
private long jobTxnId;
private final Dataverse defaultDataverse;
private static final Map<String, String> adapterFactoryMapping = initializeAdapterFactoryMapping();
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
}
public void setConfig(Map<String, String> config) {
this.config = config;
}
public Map<String, String[]> getAllStores() {
return stores;
}
public Map<String, String> getConfig() {
return config;
}
public AqlMetadataProvider(MetadataTransactionContext mdTxnCtx, Dataverse defaultDataverse) {
this.mdTxnCtx = mdTxnCtx;
this.defaultDataverse = defaultDataverse;
this.stores = AsterixProperties.INSTANCE.getStores();
}
public void setJobTxnId(long txnId) {
this.jobTxnId = txnId;
}
public Dataverse getDefaultDataverse() {
return defaultDataverse;
}
public String getDefaultDataverseName() {
return defaultDataverse == null ? null : defaultDataverse.getDataverseName();
}
public void setWriteTransaction(boolean writeTransaction) {
this.isWriteTransaction = writeTransaction;
}
public void setWriterFactory(IAWriterFactory writerFactory) {
this.writerFactory = writerFactory;
}
public MetadataTransactionContext getMetadataTxnContext() {
return mdTxnCtx;
}
public IAWriterFactory getWriterFactory() {
return this.writerFactory;
}
public FileSplit getOutputFile() {
return outputFile;
}
public void setOutputFile(FileSplit outputFile) {
this.outputFile = outputFile;
}
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
AqlSourceId aqlId = (AqlSourceId) id;
try {
return lookupSourceInMetadata(aqlId);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public boolean isWriteTransaction() {
return isWriteTransaction;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context, JobSpecification jobSpec)
throws AlgebricksException {
Dataset dataset;
try {
dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataSource.getId().getDataverseName(), dataSource
.getId().getDatasetName());
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName()
+ " in dataverse " + dataSource.getId().getDataverseName());
}
switch (dataset.getDatasetType()) {
case FEED:
if (dataSource instanceof ExternalFeedDataSource) {
return buildExternalDatasetScan(jobSpec, dataset, dataSource);
} else {
return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
context);
}
case INTERNAL: {
return buildInternalDatasetScan(jobSpec, scanVariables, opSchema, typeEnv, dataset, dataSource,
context);
}
case EXTERNAL: {
return buildExternalDatasetScan(jobSpec, dataset, dataSource);
}
default: {
throw new IllegalArgumentException();
}
}
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
Dataset dataset, IDataSource<AqlSourceId> dataSource, JobGenContext context) throws AlgebricksException,
MetadataException {
AqlSourceId asid = dataSource.getId();
String dataverseName = asid.getDataverseName();
String datasetName = asid.getDatasetName();
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
return buildBtreeRuntime(jobSpec, outputVars, opSchema, typeEnv, context, false, dataset,
primaryIndex.getIndexName(), null, null, true, true);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
Dataset dataset, IDataSource<AqlSourceId> dataSource) throws AlgebricksException, MetadataException {
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
.getDatatype();
if (dataSource instanceof ExternalFeedDataSource) {
return buildFeedIntakeRuntime(jobSpec, dataset);
} else {
return buildExternalDataScannerRuntime(jobSpec, itemType,
(ExternalDatasetDetails) dataset.getDatasetDetails(), NonTaggedDataFormat.INSTANCE);
}
}
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, ExternalDatasetDetails datasetDetails, IDataFormat format)
throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
IGenericDatasetAdapterFactory adapterFactory;
IDatasourceAdapter adapter;
String adapterName;
DatasourceAdapter adapterEntity;
String adapterFactoryClassname;
try {
adapterName = datasetDetails.getAdapter();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
adapterName);
if (adapterEntity != null) {
adapterFactoryClassname = adapterEntity.getClassname();
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
} else {
adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
if (adapterFactoryClassname == null) {
throw new AlgebricksException(" Unknown adapter :" + adapterName);
}
adapterFactory = (IGenericDatasetAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties(),
itemType);
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("Unable to create adapter " + e);
}
if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
IDatasourceAdapter.AdapterType.READ_WRITE))) {
throw new AlgebricksException("external dataset adapter does not support read operation");
}
ARecordType rt = (ARecordType) itemType;
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
adapterFactoryClassname, datasetDetails.getProperties(), rt, scannerDesc);
AlgebricksPartitionConstraint constraint;
try {
constraint = adapter.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
}
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(JobSpecification jobSpec,
IAType itemType, IParseFileSplitsDecl decl, IDataFormat format) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
ARecordType rt = (ARecordType) itemType;
ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
FileSplit[] splits = decl.getSplits();
IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
scannerDesc);
String[] locs = new String[splits.length];
for (int i = 0; i < splits.length; i++) {
locs[i] = splits[i].getNodeName();
}
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
}
@SuppressWarnings("rawtypes")
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(JobSpecification jobSpec,
Dataset dataset) throws AlgebricksException {
FeedDatasetDetails datasetDetails = (FeedDatasetDetails) dataset.getDatasetDetails();
DatasourceAdapter adapterEntity;
IDatasourceAdapter adapter;
IAdapterFactory adapterFactory;
IAType adapterOutputType;
String adapterName;
String adapterFactoryClassname;
try {
adapterName = datasetDetails.getAdapterFactory();
adapterEntity = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME,
adapterName);
if (adapterEntity != null) {
adapterFactoryClassname = adapterEntity.getClassname();
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
} else {
adapterFactoryClassname = adapterFactoryMapping.get(adapterName);
if (adapterFactoryClassname != null) {
} else {
// adapterName has been provided as a fully qualified classname
adapterFactoryClassname = adapterName;
}
adapterFactory = (IAdapterFactory) Class.forName(adapterFactoryClassname).newInstance();
}
if (adapterFactory instanceof ITypedDatasetAdapterFactory) {
adapter = ((ITypedDatasetAdapterFactory) adapterFactory).createAdapter(datasetDetails.getProperties());
adapterOutputType = ((ITypedDatasourceAdapter) adapter).getAdapterOutputType();
} else if (adapterFactory instanceof IGenericDatasetAdapterFactory) {
String outputTypeName = datasetDetails.getProperties().get(IGenericDatasetAdapterFactory.KEY_TYPE_NAME);
adapterOutputType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(),
outputTypeName).getDatatype();
adapter = ((IGenericDatasetAdapterFactory) adapterFactory).createAdapter(
datasetDetails.getProperties(), adapterOutputType);
} else {
throw new IllegalStateException(" Unknown factory type for " + adapterFactoryClassname);
}
} catch (AlgebricksException ae) {
throw ae;
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to create adapter " + e);
}
ISerializerDeserializer payloadSerde = NonTaggedDataFormat.INSTANCE.getSerdeProvider()
.getSerializerDeserializer(adapterOutputType);
RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(
dataset.getDataverseName(), dataset.getDatasetName()), adapterFactoryClassname,
datasetDetails.getProperties(), (ARecordType) adapterOutputType, feedDesc);
AlgebricksPartitionConstraint constraint = null;
try {
constraint = adapter.getPartitionConstraint();
} catch (Exception e) {
throw new AlgebricksException(e);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
AqlMetadataProvider metadataProvider, JobSpecification jobSpec, FeedDatasetDetails datasetDetails,
String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = metadataProvider
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataverse, dataset, dataset);
FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
feedMessages);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] lowKeyFields,
int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
boolean isSecondary = true;
try {
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
if (primaryIndex != null) {
isSecondary = !indexName.equals(primaryIndex.getIndexName());
}
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
int numKeys = numPrimaryKeys;
int keysStartIndex = outputRecDesc.getFieldCount() - numKeys - 1;
if (isSecondary) {
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
int numSecondaryKeys = secondaryIndex.getKeyFieldNames().size();
numKeys += numSecondaryKeys;
keysStartIndex = outputVars.size() - numKeys;
}
IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
outputVars, keysStartIndex, numKeys, typeEnv, context);
ITypeTraits[] typeTraits = null;
if (isSecondary) {
typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys, typeEnv, context);
} else {
typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex, numKeys + 1, typeEnv,
context);
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
try {
spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first,
typeTraits, comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
new BTreeDataflowHelperFactory(), retainInput, NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, Dataset dataset, String indexName, int[] keyFields)
throws AlgebricksException {
try {
ARecordType recType = (ARecordType) findType(dataset.getDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ dataset.getDatasetName());
}
List<String> secondaryKeyFields = secondaryIndex.getKeyFieldNames();
int numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
throw new AlgebricksException(
"Cannot use "
+ numSecondaryKeys
+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
}
Pair<IAType, Boolean> keyTypePair = Index.getNonNullableKeyFieldType(secondaryKeyFields.get(0), recType);
IAType keyType = keyTypePair.first;
if (keyType == null) {
throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
}
int numDimensions = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
int numNestedSecondaryKeyFields = numDimensions * 2;
IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
for (int i = 0; i < numNestedSecondaryKeyFields; i++) {
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
RecordDescriptor outputRecDesc = JobGenHelper.mkRecordDescriptor(typeEnv, opSchema, context);
int keysStartIndex = outputRecDesc.getFieldCount() - numNestedSecondaryKeyFields - numPrimaryKeys;
if (retainInput) {
keysStartIndex -= numNestedSecondaryKeyFields;
}
IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
outputVars, keysStartIndex, numNestedSecondaryKeyFields, typeEnv, context);
ITypeTraits[] typeTraits = JobGenHelper.variablesToTypeTraits(outputVars, keysStartIndex,
numNestedSecondaryKeyFields, typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataset.getDataverseName(), dataset.getDatasetName(), indexName);
RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first,
typeTraits, comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
retainInput, NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
FileSplitDataSink fsds = (FileSplitDataSink) sink;
FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
FileSplit fs = fssi.getFileSplit();
File outFile = fs.getLocalFile().getFile();
String nodeId = fs.getNodeName();
SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
getWriterFactory(), inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
}
@Override
public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
throws AlgebricksException {
AqlDataSource ads = findDataSource(dataSourceId);
Dataset dataset = ads.getDataset();
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
throw new AlgebricksException("No index for external dataset " + dataSourceId);
}
try {
String indexName = (String) indexId;
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex != null) {
return new AqlIndex(secondaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
} else {
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
if (primaryIndex.getIndexName().equals(indexId)) {
return new AqlIndex(primaryIndex, dataset.getDataverseName(), dataset.getDatasetName(), this);
} else {
return null;
}
}
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
public AqlDataSource lookupSourceInMetadata(AqlSourceId aqlId) throws AlgebricksException, MetadataException {
Dataset dataset = findDataset(aqlId.getDataverseName(), aqlId.getDatasetName());
if (dataset == null) {
throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
}
String tName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, aqlId.getDataverseName(), tName).getDatatype();
return new AqlDataSource(aqlId, dataset, itemType);
}
@Override
public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
AqlSourceId asid = dataSource.getId();
String dataverseName = asid.getDataverseName();
String datasetName = asid.getDatasetName();
Dataset dataset = null;
try {
dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
} catch (MetadataException e) {
throw new IllegalStateException(e);
}
if (dataset == null) {
throw new IllegalArgumentException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
}
return dataset.getDatasetType() == DatasetType.EXTERNAL;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
String dataverseName = dataSource.getId().getDataverseName();
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1];
// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
}
try {
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
String itemTypeName = dataset.getItemTypeName();
ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
dataset.getDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataSource.getId().getDataverseName(), datasetName, indexName);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertOrDeleteRuntime(IndexOp indexOp,
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// Move key fields to front.
int[] fieldPermutation = new int[numKeys + 1];
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
Dataset dataset = findDataset(dataSource.getId().getDataverseName(), datasetName);
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse "
+ dataSource.getId().getDataverseName());
}
try {
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
String itemTypeName = dataset.getItemTypeName();
ARecordType itemType = (ARecordType) MetadataManager.INSTANCE.getDatatype(mdTxnCtx,
dataSource.getId().getDataverseName(), itemTypeName).getDatatype();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(dataset, itemType);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataSource.getId().getDataverseName(), datasetName, indexName);
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOp.INSERT, dataSource, propagatedSchema, keys, payload, recordDesc,
context, spec);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
throws AlgebricksException {
return getInsertOrDeleteRuntime(IndexOp.DELETE, dataSource, propagatedSchema, keys, payload, recordDesc,
context, spec);
}
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertOrDeleteRuntime(IndexOp indexOp,
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String dataverseName = dataSourceIndex.getDataSource().getId().getDataverseName();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
Index secondaryIndex;
try {
secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
switch (secondaryIndex.getIndexType()) {
case BTREE: {
return getBTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
}
case RTREE: {
return getRTreeDmlRuntime(dataverseName, datasetName, indexName, propagatedSchema, primaryKeys,
secondaryKeys, filterFactory, recordDesc, context, spec, indexOp);
}
default: {
throw new AlgebricksException("Insert and delete not implemented for index type: "
+ secondaryIndex.getIndexType());
}
}
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
return getIndexInsertOrDeleteRuntime(IndexOp.INSERT, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
return getIndexInsertOrDeleteRuntime(IndexOp.DELETE, dataSourceIndex, propagatedSchema, inputSchemas, typeEnv,
primaryKeys, secondaryKeys, filterExpr, recordDesc, context, spec);
}
private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
}
IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
IScalarEvaluatorFactory filterEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(filterExpr,
typeEnv, inputSchemas, context);
return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspectorFactory());
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String dataverseName,
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
int numKeys = primaryKeys.size() + secondaryKeys.size();
// generate field permutations
int[] fieldPermutation = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
Dataset dataset = findDataset(dataverseName, datasetName);
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
}
String itemTypeName = dataset.getItemTypeName();
IAType itemType;
try {
itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataset.getDataverseName(), itemTypeName)
.getDatatype();
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
// Index parameters.
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
List<String> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
for (i = 0; i < secondaryKeys.size(); ++i) {
Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs.get(i)
.toString(), recType);
IAType keyType = keyPairType.first;
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
for (String partitioningKey : partitioningKeys) {
IAType keyType = recType.getFieldType(partitioningKey);
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeInsert = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeInsert, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String dataverseName,
String datasetName, String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
try {
Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
List<String> secondaryKeyExprs = secondaryIndex.getKeyFieldNames();
Pair<IAType, Boolean> keyPairType = Index.getNonNullableKeyFieldType(secondaryKeyExprs.get(0), recType);
IAType spatialType = keyPairType.first;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numSecondaryKeys = dimension * 2;
int numPrimaryKeys = primaryKeys.size();
int numKeys = numSecondaryKeys + numPrimaryKeys;
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
int[] fieldPermutation = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
for (i = 0; i < numSecondaryKeys; i++) {
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
nestedKeyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
for (String partitioningKey : partitioningKeys) {
IAType keyType = recType.getFieldType(partitioningKey);
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
dataverseName, datasetName, indexName);
TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
NoOpOperationCallbackProvider.INSTANCE, jobTxnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
} catch (MetadataException | IOException e) {
throw new AlgebricksException(e);
}
}
public long getJobTxnId() {
return jobTxnId;
}
public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
}
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return AsterixBuiltinFunctions.lookupFunction(fid);
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
FileSplit[] splits = splitsForInternalOrFeedDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
String[] loc = new String[splits.length];
for (int p = 0; p < splits.length; p++) {
loc[p] = splits[p].getNodeName();
}
AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
}
private FileSplit[] splitsForInternalOrFeedDataset(MetadataTransactionContext mdTxnCtx, String dataverseName,
String datasetName, String targetIdxName) throws AlgebricksException {
try {
File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
if (dataset.getDatasetType() != DatasetType.INTERNAL & dataset.getDatasetType() != DatasetType.FEED) {
throw new AlgebricksException("Not an internal or feed dataset");
}
InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
List<String> nodeGroup = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, datasetDetails.getNodeGroupName())
.getNodeNames();
if (nodeGroup == null) {
throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
}
List<FileSplit> splitArray = new ArrayList<FileSplit>();
for (String nd : nodeGroup) {
String[] nodeStores = stores.get(nd);
if (nodeStores == null) {
LOGGER.warning("Node " + nd + " has no stores.");
throw new AlgebricksException("Node " + nd + " has no stores.");
} else {
for (int j = 0; j < nodeStores.length; j++) {
File f = new File(nodeStores[j] + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f)));
}
}
}
FileSplit[] splits = new FileSplit[splitArray.size()];
int i = 0;
for (FileSplit fs : splitArray) {
splits[i++] = fs;
}
return splits;
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
}
private static Map<String, String> initializeAdapterFactoryMapping() {
Map<String, String> adapterFactoryMapping = new HashMap<String, String>();
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.NCFileSystemAdapter",
"edu.uci.ics.asterix.external.adapter.factory.NCFileSystemAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.HDFSAdapter",
"edu.uci.ics.asterix.external.adapter.factory.HDFSAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapter",
"edu.uci.ics.asterix.external.dataset.adapter.PullBasedTwitterAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.RSSFeedAdapter",
"edu.uci.ics.asterix.external.dataset.adapter..RSSFeedAdapterFactory");
adapterFactoryMapping.put("edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapter",
"edu.uci.ics.asterix.external.dataset.adapter.CNNFeedAdapterFactory");
return adapterFactoryMapping;
}
public DatasourceAdapter getAdapter(MetadataTransactionContext mdTxnCtx, String dataverseName, String adapterName)
throws MetadataException {
DatasourceAdapter adapter = null;
// search in default namespace (built-in adapter)
adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, MetadataConstants.METADATA_DATAVERSE_NAME, adapterName);
// search in dataverse (user-defined adapter)
if (adapter == null) {
adapter = MetadataManager.INSTANCE.getAdapter(mdTxnCtx, dataverseName, adapterName);
}
return adapter;
}
private static String getRelativePath(String dataverseName, String fileName) {
return dataverseName + File.separator + fileName;
}
public Pair<IFileSplitProvider, IFileSplitProvider> getInvertedIndexFileSplitProviders(
IFileSplitProvider splitProvider) {
int numSplits = splitProvider.getFileSplits().length;
FileSplit[] btreeSplits = new FileSplit[numSplits];
FileSplit[] invListsSplits = new FileSplit[numSplits];
for (int i = 0; i < numSplits; i++) {
String nodeName = splitProvider.getFileSplits()[i].getNodeName();
String path = splitProvider.getFileSplits()[i].getLocalFile().getFile().getPath();
btreeSplits[i] = new FileSplit(nodeName, path + "_$btree");
invListsSplits[i] = new FileSplit(nodeName, path + "_$invlists");
}
return new Pair<IFileSplitProvider, IFileSplitProvider>(new ConstantFileSplitProvider(btreeSplits),
new ConstantFileSplitProvider(invListsSplits));
}
public Dataset findDataset(String dataverse, String dataset) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, dataset);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public IAType findType(String dataverse, String typeName) {
Datatype type;
try {
type = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverse, typeName);
} catch (Exception e) {
throw new IllegalStateException();
}
if (type == null) {
throw new IllegalStateException();
}
return type.getDatatype();
}
public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
try {
return MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public AlgebricksPartitionConstraint getClusterLocations() {
ArrayList<String> locs = new ArrayList<String>();
for (String k : stores.keySet()) {
String[] nodeStores = stores.get(k);
for (int j = 0; j < nodeStores.length; j++) {
locs.add(k);
}
}
String[] cluster = new String[locs.size()];
cluster = locs.toArray(cluster);
return new AlgebricksAbsolutePartitionConstraint(cluster);
}
public IDataFormat getFormat() {
return FormatUtils.getDefaultFormat();
}
}