blob: d8e340c650043c869bcc3d03f817efba5cca34ea [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
import java.util.List;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.dataflow.IAsterixApplicationContextInfo;
import edu.uci.ics.asterix.common.parse.IParseFileSplitsDecl;
import edu.uci.ics.asterix.dataflow.data.nontagged.valueproviders.AqlPrimitiveValueProviderFactory;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceAdapter;
import edu.uci.ics.asterix.external.data.adapter.api.IDatasourceReadAdapter;
import edu.uci.ics.asterix.external.data.operator.ExternalDataScanOperatorDescriptor;
import edu.uci.ics.asterix.feed.comm.IFeedMessage;
import edu.uci.ics.asterix.feed.mgmt.FeedId;
import edu.uci.ics.asterix.feed.operator.FeedIntakeOperatorDescriptor;
import edu.uci.ics.asterix.feed.operator.FeedMessageOperatorDescriptor;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.formats.nontagged.AqlTypeTraitProvider;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.ATypeTag;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.util.NonTaggedFormatUtil;
import edu.uci.ics.asterix.runtime.base.AsterixTupleFilterFactory;
import edu.uci.ics.asterix.runtime.transaction.TreeIndexInsertUpdateDeleteOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ILogicalExpressionJobGen;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSink;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourceIndex;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
import edu.uci.ics.hyracks.algebricks.core.jobgen.impl.JobGenContext;
import edu.uci.ics.hyracks.algebricks.data.IPrinterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMLeafFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, String> {
private final long txnId;
private boolean isWriteTransaction;
private final AqlCompiledMetadataDeclarations metadata;
public AqlMetadataProvider(long txnId, boolean isWriteTransaction, AqlCompiledMetadataDeclarations metadata) {
this.txnId = txnId;
this.isWriteTransaction = isWriteTransaction;
this.metadata = metadata;
}
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
AqlSourceId aqlId = (AqlSourceId) id;
return lookupSourceInMetadata(metadata, aqlId);
}
public AqlCompiledMetadataDeclarations getMetadataDeclarations() {
return metadata;
}
public boolean isWriteTransaction() {
return isWriteTransaction;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
List<LogicalVariable> projectVariables, boolean projectPushed, JobGenContext context,
JobSpecification jobSpec) throws AlgebricksException {
AqlCompiledDatasetDecl adecl = metadata.findDataset(dataSource.getId().getDatasetName());
if (adecl == null) {
throw new AlgebricksException("Unknown dataset " + dataSource.getId().getDatasetName());
}
switch (adecl.getDatasetType()) {
case FEED:
if (dataSource instanceof ExternalFeedDataSource) {
return buildExternalDatasetScan(jobSpec, adecl, dataSource);
} else {
return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
}
case INTERNAL: {
return buildInternalDatasetScan(jobSpec, adecl, dataSource, context);
}
case EXTERNAL: {
return buildExternalDatasetScan(jobSpec, adecl, dataSource);
}
default: {
throw new IllegalArgumentException();
}
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildInternalDatasetScan(JobSpecification jobSpec,
AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource, JobGenContext context)
throws AlgebricksException {
AqlSourceId asid = dataSource.getId();
String datasetName = asid.getDatasetName();
String indexName = DatasetUtils.getPrimaryIndex(acedl).getIndexName();
try {
return buildBtreeRuntime(metadata, context, jobSpec, datasetName, acedl, indexName, null, null, true, true);
} catch (AlgebricksException e) {
throw new AlgebricksException(e);
}
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDatasetScan(JobSpecification jobSpec,
AqlCompiledDatasetDecl acedl, IDataSource<AqlSourceId> dataSource) throws AlgebricksException {
String itemTypeName = acedl.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(itemTypeName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
if (dataSource instanceof ExternalFeedDataSource) {
AqlCompiledFeedDatasetDetails acfdd = (AqlCompiledFeedDatasetDetails) ((ExternalFeedDataSource) dataSource)
.getCompiledDatasetDecl().getAqlCompiledDatasetDetails();
return buildFeedIntakeRuntime(jobSpec, metadata.getDataverseName(), acedl.getName(), itemType, acfdd,
metadata.getFormat());
} else {
return buildExternalDataScannerRuntime(jobSpec, itemType,
(AqlCompiledExternalDatasetDetails) acedl.getAqlCompiledDatasetDetails(), metadata.getFormat());
}
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildExternalDataScannerRuntime(
JobSpecification jobSpec, IAType itemType, AqlCompiledExternalDatasetDetails decl, IDataFormat format)
throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
IDatasourceReadAdapter adapter;
try {
adapter = (IDatasourceReadAdapter) Class.forName(decl.getAdapter()).newInstance();
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to load the adapter class " + e);
}
if (!(adapter.getAdapterType().equals(IDatasourceAdapter.AdapterType.READ) || adapter.getAdapterType().equals(
IDatasourceAdapter.AdapterType.READ_WRITE))) {
throw new AlgebricksException("external dataset adapter does not support read operation");
}
ARecordType rt = (ARecordType) itemType;
try {
adapter.configure(decl.getProperties(), itemType);
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to configure the datasource adapter " + e);
}
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
ExternalDataScanOperatorDescriptor dataScanner = new ExternalDataScanOperatorDescriptor(jobSpec,
decl.getAdapter(), decl.getProperties(), rt, scannerDesc);
dataScanner.setDatasourceAdapter(adapter);
AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(dataScanner, constraint);
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildScannerRuntime(
JobSpecification jobSpec, IAType itemType, IParseFileSplitsDecl decl, IDataFormat format)
throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only scan datasets of records.");
}
ARecordType rt = (ARecordType) itemType;
ITupleParserFactory tupleParser = format.createTupleParser(rt, decl);
FileSplit[] splits = decl.getSplits();
IFileSplitProvider scannerSplitProvider = new ConstantFileSplitProvider(splits);
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
IOperatorDescriptor scanner = new FileScanOperatorDescriptor(jobSpec, scannerSplitProvider, tupleParser,
scannerDesc);
String[] locs = new String[splits.length];
for (int i = 0; i < splits.length; i++) {
locs[i] = splits[i].getNodeName();
}
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(locs);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(scanner, apc);
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedIntakeRuntime(
JobSpecification jobSpec, String dataverse, String dataset, IAType itemType,
AqlCompiledFeedDatasetDetails decl, IDataFormat format) throws AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Can only consume records.");
}
IDatasourceAdapter adapter;
try {
adapter = (IDatasourceAdapter) Class.forName(decl.getAdapter()).newInstance();
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to load the adapter class " + e);
}
ARecordType rt = (ARecordType) itemType;
try {
adapter.configure(decl.getProperties(), itemType);
} catch (Exception e) {
e.printStackTrace();
throw new AlgebricksException("unable to configure the datasource adapter " + e);
}
ISerializerDeserializer payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
RecordDescriptor feedDesc = new RecordDescriptor(new ISerializerDeserializer[] { payloadSerde });
FeedIntakeOperatorDescriptor feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedId(dataverse,
dataset), decl.getAdapter(), decl.getProperties(), rt, feedDesc);
AlgebricksPartitionConstraint constraint = adapter.getPartitionConstraint();
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedIngestor, constraint);
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildFeedMessengerRuntime(
JobSpecification jobSpec, AqlCompiledMetadataDeclarations metadata, AqlCompiledFeedDatasetDetails decl,
String dataverse, String dataset, List<IFeedMessage> feedMessages) throws AlgebricksException {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
try {
spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(dataset, dataset);
} catch (Exception e) {
throw new AlgebricksException(e);
}
FeedMessageOperatorDescriptor feedMessenger = new FeedMessageOperatorDescriptor(jobSpec, dataverse, dataset,
feedMessages);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(feedMessenger, spPc.second);
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildBtreeRuntime(
AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] lowKeyFields,
int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws AlgebricksException {
String itemTypeName = ddecl.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(itemTypeName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
boolean isSecondary = true;
AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
if (primIdxDecl != null) {
isSecondary = !indexName.equals(primIdxDecl.getIndexName());
}
int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
ISerializerDeserializer[] recordFields;
IBinaryComparatorFactory[] comparatorFactories;
ITypeTraits[] typeTraits;
int numSecondaryKeys = 0;
int i = 0;
if (isSecondary) {
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
if (cid == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ datasetName);
}
List<String> secondaryKeyFields = cid.getFieldExprs();
numSecondaryKeys = secondaryKeyFields.size();
int numKeys = numSecondaryKeys + numPrimaryKeys;
recordFields = new ISerializerDeserializer[numKeys];
typeTraits = new ITypeTraits[numKeys];
// comparatorFactories = new
// IBinaryComparatorFactory[numSecondaryKeys];
comparatorFactories = new IBinaryComparatorFactory[numKeys];
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
for (i = 0; i < numSecondaryKeys; i++) {
Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
secondaryKeyFields.get(i), recType);
IAType keyType = keyTypePair.first;
ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
.getSerializerDeserializer(keyType);
recordFields[i] = keySerde;
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
keyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
} else {
recordFields = new ISerializerDeserializer[numPrimaryKeys + 1];
comparatorFactories = new IBinaryComparatorFactory[numPrimaryKeys];
typeTraits = new ITypeTraits[numPrimaryKeys + 1];
ISerializerDeserializer payloadSerde = metadata.getFormat().getSerdeProvider()
.getSerializerDeserializer(itemType);
recordFields[numPrimaryKeys] = payloadSerde;
typeTraits[numPrimaryKeys] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(itemType);
}
for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
.getPartitioningFunctions(ddecl)) {
IAType keyType = evalFactoryAndType.third;
ISerializerDeserializer keySerde = metadata.getFormat().getSerdeProvider()
.getSerializerDeserializer(keyType);
recordFields[i] = keySerde;
// if (!isSecondary) {
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
// }
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
RecordDescriptor recDesc = new RecordDescriptor(recordFields);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
try {
spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
BTreeSearchOperatorDescriptor btreeSearchOp = new BTreeSearchOperatorDescriptor(jobSpec, recDesc,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
comparatorFactories, lowKeyFields, highKeyFields, lowKeyInclusive, highKeyInclusive,
new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeSearchOp, spPc.second);
}
@SuppressWarnings("rawtypes")
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> buildRtreeRuntime(
AqlCompiledMetadataDeclarations metadata, JobGenContext context, JobSpecification jobSpec,
String datasetName, AqlCompiledDatasetDecl ddecl, String indexName, int[] keyFields)
throws AlgebricksException {
String itemTypeName = ddecl.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(itemTypeName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
boolean isSecondary = true;
AqlCompiledIndexDecl primIdxDecl = DatasetUtils.getPrimaryIndex(ddecl);
if (primIdxDecl != null) {
isSecondary = !indexName.equals(primIdxDecl.getIndexName());
}
int numPrimaryKeys = DatasetUtils.getPartitioningFunctions(ddecl).size();
ISerializerDeserializer[] recordFields;
IBinaryComparatorFactory[] comparatorFactories;
ITypeTraits[] typeTraits;
IPrimitiveValueProviderFactory[] valueProviderFactories;
int numSecondaryKeys = 0;
int numNestedSecondaryKeyFields = 0;
int i = 0;
if (isSecondary) {
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(ddecl, indexName);
if (cid == null) {
throw new AlgebricksException("Code generation error: no index " + indexName + " for dataset "
+ datasetName);
}
List<String> secondaryKeyFields = cid.getFieldExprs();
numSecondaryKeys = secondaryKeyFields.size();
if (numSecondaryKeys != 1) {
throw new AlgebricksException(
"Cannot use "
+ numSecondaryKeys
+ " fields as a key for the R-tree index. There can be only one field as a key for the R-tree index.");
}
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
Pair<IAType, Boolean> keyTypePair = AqlCompiledIndexDecl.getNonNullableKeyFieldType(
secondaryKeyFields.get(0), recType);
IAType keyType = keyTypePair.first;
if (keyType == null) {
throw new AlgebricksException("Could not find field " + secondaryKeyFields.get(0) + " in the schema.");
}
int dimension = NonTaggedFormatUtil.getNumDimensions(keyType.getTypeTag());
numNestedSecondaryKeyFields = dimension * 2;
int numFields = numNestedSecondaryKeyFields + numPrimaryKeys;
recordFields = new ISerializerDeserializer[numFields];
typeTraits = new ITypeTraits[numFields];
comparatorFactories = new IBinaryComparatorFactory[numNestedSecondaryKeyFields];
valueProviderFactories = new IPrimitiveValueProviderFactory[numNestedSecondaryKeyFields];
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
for (i = 0; i < numNestedSecondaryKeyFields; i++) {
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(nestedKeyType);
recordFields[i] = keySerde;
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
nestedKeyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
} else {
throw new AlgebricksException("R-tree can only be used as a secondary index");
}
for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
.getPartitioningFunctions(ddecl)) {
IAType keyType = evalFactoryAndType.third;
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(keyType);
recordFields[i] = keySerde;
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
RecordDescriptor recDesc = new RecordDescriptor(recordFields);
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
try {
spPc = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
RTreeSearchOperatorDescriptor rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, recDesc,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(), spPc.first, typeTraits,
comparatorFactories, keyFields, new RTreeDataflowHelperFactory(valueProviderFactories),
NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeSearchOp, spPc.second);
}
@Override
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
FileSplitDataSink fsds = (FileSplitDataSink) sink;
FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
FileSplit fs = fssi.getFileSplit();
File outFile = fs.getLocalFile().getFile();
String nodeId = fs.getNodeName();
SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
metadata.getWriterFactory(), inputDesc);
AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId });
return new Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint>(runtime, apc);
}
@Override
public IDataSourceIndex<String, AqlSourceId> findDataSourceIndex(String indexId, AqlSourceId dataSourceId)
throws AlgebricksException {
AqlDataSource ads = findDataSource(dataSourceId);
AqlCompiledDatasetDecl adecl = ads.getCompiledDatasetDecl();
if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
throw new AlgebricksException("No index for external dataset " + dataSourceId);
}
String idxName = (String) indexId;
AqlCompiledIndexDecl acid = DatasetUtils.findSecondaryIndexByName(adecl, idxName);
AqlSourceId asid = (AqlSourceId) dataSourceId;
if (acid != null) {
return new AqlIndex(acid, metadata, asid.getDatasetName());
} else {
AqlCompiledIndexDecl primIdx = DatasetUtils.getPrimaryIndex(adecl);
if (primIdx.getIndexName().equals(indexId)) {
return new AqlIndex(primIdx, metadata, asid.getDatasetName());
} else {
return null;
}
}
}
public static AqlDataSource lookupSourceInMetadata(AqlCompiledMetadataDeclarations metadata, AqlSourceId aqlId)
throws AlgebricksException {
if (!aqlId.getDataverseName().equals(metadata.getDataverseName())) {
return null;
}
AqlCompiledDatasetDecl acdd = metadata.findDataset(aqlId.getDatasetName());
if (acdd == null) {
throw new AlgebricksException("Datasource with id " + aqlId + " was not found.");
}
String tName = acdd.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(tName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
return new AqlDataSource(aqlId, acdd, itemType);
}
@Override
public boolean scannerOperatorIsLeaf(IDataSource<AqlSourceId> dataSource) {
AqlSourceId asid = dataSource.getId();
String datasetName = asid.getDatasetName();
AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
if (adecl == null) {
throw new IllegalArgumentException("Unknown dataset " + datasetName);
}
return adecl.getDatasetType() == DatasetType.EXTERNAL;
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, JobGenContext context, JobSpecification spec) throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1];
// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(),
NoOpOperationCallbackProvider.INSTANCE);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getInsertRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1];
// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.INSERT,
new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getDeleteRuntime(
IDataSource<AqlSourceId> dataSource, IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payload, RecordDescriptor recordDesc, JobGenContext context, JobSpecification spec)
throws AlgebricksException {
String datasetName = dataSource.getId().getDatasetName();
int numKeys = keys.size();
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1];
// System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
int i = 0;
for (LogicalVariable varKey : keys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
String indexName = DatasetUtils.getPrimaryIndex(compiledDatasetDecl).getIndexName();
ITypeTraits[] typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
compiledDatasetDecl, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, IndexOp.DELETE,
new BTreeDataflowHelperFactory(), null, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexInsertRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
if (cid.getKind() == IndexKind.BTREE) {
return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
filterFactory, recordDesc, context, spec, IndexOp.INSERT);
} else {
return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
filterFactory, recordDesc, context, spec, IndexOp.INSERT);
}
}
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getIndexDeleteRuntime(
IDataSourceIndex<String, AqlSourceId> dataSourceIndex, IOperatorSchema propagatedSchema,
IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec) throws AlgebricksException {
String indexName = dataSourceIndex.getId();
String datasetName = dataSourceIndex.getDataSource().getId().getDatasetName();
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
AsterixTupleFilterFactory filterFactory = createTupleFilterFactory(inputSchemas, typeEnv, filterExpr, context);
if (cid.getKind() == IndexKind.BTREE) {
return getBTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
filterFactory, recordDesc, context, spec, IndexOp.DELETE);
} else {
return getRTreeDmlRuntime(datasetName, indexName, propagatedSchema, primaryKeys, secondaryKeys,
filterFactory, recordDesc, context, spec, IndexOp.DELETE);
}
}
private AsterixTupleFilterFactory createTupleFilterFactory(IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv, ILogicalExpression filterExpr, JobGenContext context)
throws AlgebricksException {
// No filtering condition.
if (filterExpr == null) {
return null;
}
ILogicalExpressionJobGen exprJobGen = context.getExpressionJobGen();
IEvaluatorFactory filterEvalFactory = exprJobGen.createEvaluatorFactory(filterExpr, typeEnv, inputSchemas,
context);
return new AsterixTupleFilterFactory(filterEvalFactory, context.getBinaryBooleanInspector());
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getBTreeDmlRuntime(String datasetName,
String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
int numKeys = primaryKeys.size() + secondaryKeys.size();
// generate field permutations
int[] fieldPermutation = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
// dataset
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AlgebricksException("Unknown dataset " + datasetName);
}
String itemTypeName = compiledDatasetDecl.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(itemTypeName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
// index parameters
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
List<String> secondaryKeyExprs = cid.getFieldExprs();
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
for (i = 0; i < secondaryKeys.size(); ++i) {
IAType keyType = AqlCompiledIndexDecl.keyFieldType(secondaryKeyExprs.get(i).toString(), recType);
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
}
for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl)) {
IAType keyType = evalFactoryAndType.third;
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
TreeIndexInsertUpdateDeleteOperatorDescriptor btreeBulkLoad = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new BTreeDataflowHelperFactory(), filterFactory, NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad, splitsAndConstraint.second);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getRTreeDmlRuntime(String datasetName,
String indexName, IOperatorSchema propagatedSchema, List<LogicalVariable> primaryKeys,
List<LogicalVariable> secondaryKeys, AsterixTupleFilterFactory filterFactory, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec, IndexOp indexOp) throws AlgebricksException {
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
String itemTypeName = compiledDatasetDecl.getItemTypeName();
IAType itemType;
try {
itemType = metadata.findType(itemTypeName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
if (itemType.getTypeTag() != ATypeTag.RECORD) {
throw new AlgebricksException("Only record types can be indexed.");
}
ARecordType recType = (ARecordType) itemType;
AqlCompiledIndexDecl cid = DatasetUtils.findSecondaryIndexByName(compiledDatasetDecl, indexName);
List<String> secondaryKeyExprs = cid.getFieldExprs();
Pair<IAType, Boolean> keyPairType = AqlCompiledIndexDecl.getNonNullableKeyFieldType(secondaryKeyExprs.get(0),
recType);
IAType spatialType = keyPairType.first;
int dimension = NonTaggedFormatUtil.getNumDimensions(spatialType.getTypeTag());
int numSecondaryKeys = dimension * 2;
int numPrimaryKeys = primaryKeys.size();
int numKeys = numSecondaryKeys + numPrimaryKeys;
ITypeTraits[] typeTraits = new ITypeTraits[numKeys];
IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[numKeys];
int[] fieldPermutation = new int[numKeys];
int i = 0;
for (LogicalVariable varKey : secondaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
for (LogicalVariable varKey : primaryKeys) {
int idx = propagatedSchema.findVariable(varKey);
fieldPermutation[i] = idx;
i++;
}
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(spatialType.getTypeTag());
IPrimitiveValueProviderFactory[] valueProviderFactories = new IPrimitiveValueProviderFactory[numSecondaryKeys];
for (i = 0; i < numSecondaryKeys; i++) {
ISerializerDeserializer keySerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(nestedKeyType);
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(
nestedKeyType, true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(nestedKeyType);
valueProviderFactories[i] = AqlPrimitiveValueProviderFactory.INSTANCE;
}
for (Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType : DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl)) {
IAType keyType = evalFactoryAndType.third;
comparatorFactories[i] = AqlBinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(keyType,
true);
typeTraits[i] = AqlTypeTraitProvider.INSTANCE.getTypeTrait(keyType);
++i;
}
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
indexName);
} catch (Exception e) {
throw new AlgebricksException(e);
}
TreeIndexInsertUpdateDeleteOperatorDescriptor rtreeUpdate = new TreeIndexInsertUpdateDeleteOperatorDescriptor(
spec, recordDesc, appContext.getStorageManagerInterface(), appContext.getIndexRegistryProvider(),
splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation, indexOp,
new RTreeDataflowHelperFactory(valueProviderFactories), filterFactory,
NoOpOperationCallbackProvider.INSTANCE, txnId);
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(rtreeUpdate, splitsAndConstraint.second);
}
public long getTxnId() {
return txnId;
}
public static ITreeIndexFrameFactory createBTreeNSMInteriorFrameFactory(ITypeTraits[] typeTraits) {
return new BTreeNSMInteriorFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
}
public static ITreeIndexFrameFactory createBTreeNSMLeafFrameFactory(ITypeTraits[] typeTraits) {
return new BTreeNSMLeafFrameFactory(new TypeAwareTupleWriterFactory(typeTraits));
}
@Override
public IFunctionInfo lookupFunction(FunctionIdentifier fid) {
return AsterixBuiltinFunctions.lookupFunction(fid);
}
}