blob: 1ddbb1f8fee3663fee0ab65e49633ac8654030b7 [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.file;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;
import edu.uci.ics.asterix.api.common.Job;
import edu.uci.ics.asterix.aql.translator.DdlTranslator.CompiledDatasetDropStatement;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.GlobalConfig;
import edu.uci.ics.asterix.common.config.OptimizationConfUtil;
import edu.uci.ics.asterix.common.context.AsterixStorageManagerInterface;
import edu.uci.ics.asterix.common.context.AsterixTreeRegistryProvider;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledDatasetDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.runtime.operators.std.NoTupleSourceRuntimeFactory;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.translator.DmlTranslator.CompiledLoadFromFileStatement;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IEvaluatorFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.base.IPushRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.jobgen.impl.ConnectorPolicyAssignmentPolicy;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.meta.AlgebricksMetaOperatorDescriptor;
import edu.uci.ics.hyracks.algebricks.core.algebra.runtime.operators.std.AssignRuntimeFactory;
import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.core.api.constraints.AlgebricksPartitionConstraintHelper;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
import edu.uci.ics.hyracks.algebricks.core.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.utils.Triple;
import edu.uci.ics.hyracks.api.dataflow.IConnectorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.IOperatorDescriptor;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.dataflow.value.ITypeTraits;
import edu.uci.ics.hyracks.api.dataflow.value.RecordDescriptor;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.job.JobSpecification;
import edu.uci.ics.hyracks.dataflow.common.data.partition.FieldHashPartitionComputerFactory;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.MToNPartitioningMergingConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.sort.ExternalSortOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndex;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexRegistryProvider;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexDropOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackProvider;
import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class DatasetOperations {
private static final PhysicalOptimizationConfig physicalOptimizationConfig = OptimizationConfUtil
.getPhysicalOptimizationConfig();
private static Logger LOGGER = Logger.getLogger(DatasetOperations.class.getName());
public static JobSpecification[] createDropDatasetJobSpec(CompiledDatasetDropStatement deleteStmt,
AqlCompiledMetadataDeclarations metadata) throws AlgebricksException, HyracksDataException,
RemoteException, ACIDException, AsterixException {
String datasetName = deleteStmt.getDatasetName();
String datasetPath = metadata.getRelativePath(datasetName);
LOGGER.info("DROP DATASETPATH: " + datasetPath);
IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
if (adecl == null) {
throw new AlgebricksException("DROP DATASET: No metadata for dataset " + datasetName);
}
if (adecl.getDatasetType() == DatasetType.EXTERNAL) {
return new JobSpecification[0];
}
List<AqlCompiledIndexDecl> secondaryIndexes = DatasetUtils.getSecondaryIndexes(adecl);
JobSpecification[] specs;
if (secondaryIndexes != null && !secondaryIndexes.isEmpty()) {
int n = secondaryIndexes.size();
specs = new JobSpecification[n + 1];
int i = 0;
// first, drop indexes
for (AqlCompiledIndexDecl acid : secondaryIndexes) {
specs[i] = new JobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, acid.getIndexName());
TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specs[i],
storageManager, btreeRegistryProvider, idxSplitsAndConstraint.first);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specs[i], secondaryBtreeDrop,
idxSplitsAndConstraint.second);
i++;
}
} else {
specs = new JobSpecification[1];
}
JobSpecification specPrimary = new JobSpecification();
specs[specs.length - 1] = specPrimary;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, datasetName);
TreeIndexDropOperatorDescriptor primaryBtreeDrop = new TreeIndexDropOperatorDescriptor(specPrimary,
storageManager, btreeRegistryProvider, splitsAndConstraint.first);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
specPrimary.addRoot(primaryBtreeDrop);
return specs;
}
public static JobSpecification[] createInitializeDatasetJobSpec(long txnId, String datasetName,
AqlCompiledMetadataDeclarations metadata) throws AsterixException {
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AsterixException("Could not find dataset " + datasetName);
}
if (compiledDatasetDecl.getDatasetType() != DatasetType.INTERNAL
&& compiledDatasetDecl.getDatasetType() != DatasetType.FEED) {
throw new AsterixException("Cannot initialize dataset (" + datasetName + ")" + "of type "
+ compiledDatasetDecl.getDatasetType());
}
ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
IDataFormat format;
ISerializerDeserializer payloadSerde;
IBinaryComparatorFactory[] comparatorFactories;
ITypeTraits[] typeTraits;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
format = metadata.getFormat();
payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(compiledDatasetDecl, metadata
.getFormat().getBinaryComparatorFactoryProvider());
typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
datasetName);
} catch (AlgebricksException e1) {
throw new AsterixException(e1);
}
IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
JobSpecification spec = new JobSpecification();
RecordDescriptor recDesc;
try {
recDesc = computePayloadKeyRecordDescriptor(compiledDatasetDecl, payloadSerde, metadata.getFormat());
NoTupleSourceRuntimeFactory factory = new NoTupleSourceRuntimeFactory();
AlgebricksMetaOperatorDescriptor asterixOp = new AlgebricksMetaOperatorDescriptor(spec, 0, 1,
new IPushRuntimeFactory[] { factory }, new RecordDescriptor[] { recDesc });
// move key fieldsx to front
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl);
int numKeys = partitioningFunctions.size();
int[] keys = new int[numKeys];
for (int i = 0; i < numKeys; i++) {
keys[i] = i + 1;
}
int[] fieldPermutation = new int[numKeys + 1];
System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
fieldPermutation[numKeys] = 0;
TreeIndexBulkLoadOperatorDescriptor bulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, btreeRegistryProvider, splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixOp,
splitsAndConstraint.second);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, bulkLoad,
splitsAndConstraint.second);
spec.connect(new OneToOneConnectorDescriptor(spec), asterixOp, 0, bulkLoad, 0);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
spec.addRoot(bulkLoad);
} catch (AlgebricksException e) {
throw new AsterixException(e);
}
return new JobSpecification[] { spec };
}
@SuppressWarnings("unchecked")
public static List<Job> createLoadDatasetJobSpec(CompiledLoadFromFileStatement loadStmt,
AqlCompiledMetadataDeclarations metadata) throws AsterixException {
String datasetName = loadStmt.getDatasetName();
AqlCompiledDatasetDecl compiledDatasetDecl = metadata.findDataset(datasetName);
if (compiledDatasetDecl == null) {
throw new AsterixException("Could not find dataset " + datasetName);
}
if (compiledDatasetDecl.getDatasetType() != DatasetType.INTERNAL
&& compiledDatasetDecl.getDatasetType() != DatasetType.FEED) {
throw new AsterixException("Cannot load data into dataset (" + datasetName + ")" + "of type "
+ compiledDatasetDecl.getDatasetType());
}
List<Job> jobSpecs = new ArrayList<Job>();
try {
jobSpecs.addAll(dropDatasetIndexes(datasetName, metadata));
} catch (AlgebricksException ae) {
throw new AsterixException(ae);
}
ARecordType itemType = (ARecordType) metadata.findType(compiledDatasetDecl.getItemTypeName());
IDataFormat format;
try {
format = metadata.getFormat();
} catch (AlgebricksException e1) {
throw new AsterixException(e1);
}
ISerializerDeserializer payloadSerde;
try {
payloadSerde = format.getSerdeProvider().getSerializerDeserializer(itemType);
} catch (AlgebricksException e) {
throw new AsterixException(e);
}
IBinaryHashFunctionFactory[] hashFactories;
IBinaryComparatorFactory[] comparatorFactories;
ITypeTraits[] typeTraits;
try {
hashFactories = DatasetUtils.computeKeysBinaryHashFunFactories(compiledDatasetDecl, metadata.getFormat()
.getBinaryHashFunctionFactoryProvider());
comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(compiledDatasetDecl, metadata
.getFormat().getBinaryComparatorFactoryProvider());
typeTraits = DatasetUtils.computeTupleTypeTraits(compiledDatasetDecl, metadata);
} catch (AlgebricksException e) {
throw new AsterixException(e);
}
JobSpecification spec = new JobSpecification();
IOperatorDescriptor scanner;
AlgebricksPartitionConstraint scannerPc;
RecordDescriptor recDesc;
try {
AqlCompiledExternalDatasetDetails add = new AqlCompiledExternalDatasetDetails(loadStmt.getAdapter(),
loadStmt.getProperties());
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> p = AqlMetadataProvider
.buildExternalDataScannerRuntime(spec, itemType, add, format);
scanner = p.first;
scannerPc = p.second;
recDesc = computePayloadKeyRecordDescriptor(compiledDatasetDecl, payloadSerde, metadata.getFormat());
} catch (AlgebricksException e) {
throw new AsterixException(e);
}
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, scanner, scannerPc);
AssignRuntimeFactory assign = makeAssignRuntimeFactory(compiledDatasetDecl);
AlgebricksMetaOperatorDescriptor asterixOp = new AlgebricksMetaOperatorDescriptor(spec, 1, 1,
new IPushRuntimeFactory[] { assign }, new RecordDescriptor[] { recDesc });
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, asterixOp, scannerPc);
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl);
int numKeys = partitioningFunctions.size();
int[] keys = new int[numKeys];
for (int i = 0; i < numKeys; i++) {
keys[i] = i + 1;
}
int framesLimit = physicalOptimizationConfig.getMaxFramesExternalSort();
IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
// move key fields to front
int[] fieldPermutation = new int[numKeys + 1];
System.arraycopy(keys, 0, fieldPermutation, 0, numKeys);
fieldPermutation[numKeys] = 0;
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint;
try {
splitsAndConstraint = metadata.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName,
datasetName);
} catch (AlgebricksException e) {
throw new AsterixException(e);
}
FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
sb.append(stringOf(fs[i]) + " ");
}
LOGGER.info("LOAD into File Splits: " + sb.toString());
TreeIndexBulkLoadOperatorDescriptor btreeBulkLoad = new TreeIndexBulkLoadOperatorDescriptor(spec,
storageManager, btreeRegistryProvider, splitsAndConstraint.first, typeTraits, comparatorFactories, fieldPermutation,
GlobalConfig.DEFAULT_BTREE_FILL_FACTOR, new BTreeDataflowHelperFactory(), NoOpOperationCallbackProvider.INSTANCE);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, btreeBulkLoad,
splitsAndConstraint.second);
spec.connect(new OneToOneConnectorDescriptor(spec), scanner, 0, asterixOp, 0);
if (!loadStmt.alreadySorted()) {
ExternalSortOperatorDescriptor sorter = new ExternalSortOperatorDescriptor(spec, framesLimit, keys,
comparatorFactories, recDesc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, sorter,
splitsAndConstraint.second);
IConnectorDescriptor hashConn = new MToNPartitioningConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keys, hashFactories));
spec.connect(hashConn, asterixOp, 0, sorter, 0);
spec.connect(new OneToOneConnectorDescriptor(spec), sorter, 0, btreeBulkLoad, 0);
} else {
IConnectorDescriptor sortMergeConn = new MToNPartitioningMergingConnectorDescriptor(spec,
new FieldHashPartitionComputerFactory(keys, hashFactories), keys, comparatorFactories);
spec.connect(sortMergeConn, asterixOp, 0, btreeBulkLoad, 0);
}
spec.addRoot(btreeBulkLoad);
spec.setConnectorPolicyAssignmentPolicy(new ConnectorPolicyAssignmentPolicy());
jobSpecs.add(new Job(spec));
return jobSpecs;
}
private static List<Job> dropDatasetIndexes(String datasetName, AqlCompiledMetadataDeclarations metadata)
throws AlgebricksException, MetadataException {
AqlCompiledDatasetDecl adecl = metadata.findDataset(datasetName);
if (adecl == null) {
throw new AlgebricksException("DROP DATASET INDEXES: No metadata for dataset " + datasetName);
}
List<AqlCompiledIndexDecl> indexes = DatasetUtils.getSecondaryIndexes(adecl);
indexes.add(DatasetUtils.getPrimaryIndex(adecl));
List<Job> specs = new ArrayList<Job>();
IIndexRegistryProvider<IIndex> btreeRegistryProvider = AsterixTreeRegistryProvider.INSTANCE;
IStorageManagerInterface storageManager = AsterixStorageManagerInterface.INSTANCE;
if (indexes != null && !indexes.isEmpty()) {
// first, drop indexes
for (AqlCompiledIndexDecl acid : indexes) {
JobSpecification spec = new JobSpecification();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> idxSplitsAndConstraint = metadata
.splitProviderAndPartitionConstraintsForInternalOrFeedDataset(datasetName, acid.getIndexName());
TreeIndexDropOperatorDescriptor secondaryBtreeDrop = new TreeIndexDropOperatorDescriptor(spec,
storageManager, btreeRegistryProvider, idxSplitsAndConstraint.first);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, secondaryBtreeDrop,
idxSplitsAndConstraint.second);
specs.add(new Job(spec));
}
}
return specs;
}
private static String stringOf(FileSplit fs) {
return fs.getNodeName() + ":" + fs.getLocalFile().toString();
}
private static AssignRuntimeFactory makeAssignRuntimeFactory(AqlCompiledDatasetDecl compiledDatasetDecl) {
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl);
int numKeys = partitioningFunctions.size();
IEvaluatorFactory[] evalFactories = new IEvaluatorFactory[numKeys];
for (int i = 0; i < numKeys; i++) {
Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
.get(i);
evalFactories[i] = evalFactoryAndType.first;
}
int[] outColumns = new int[numKeys];
int[] projectionList = new int[numKeys + 1];
projectionList[0] = 0;
for (int i = 0; i < numKeys; i++) {
outColumns[i] = i + 1;
projectionList[i + 1] = i + 1;
}
return new AssignRuntimeFactory(outColumns, evalFactories, projectionList);
}
@SuppressWarnings("unchecked")
private static RecordDescriptor computePayloadKeyRecordDescriptor(AqlCompiledDatasetDecl compiledDatasetDecl,
ISerializerDeserializer payloadSerde, IDataFormat dataFormat) throws AlgebricksException {
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningFunctions = DatasetUtils
.getPartitioningFunctions(compiledDatasetDecl);
int numKeys = partitioningFunctions.size();
ISerializerDeserializer[] recordFields = new ISerializerDeserializer[1 + numKeys];
recordFields[0] = payloadSerde;
for (int i = 0; i < numKeys; i++) {
Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFactoryAndType = partitioningFunctions
.get(i);
IAType keyType = evalFactoryAndType.third;
ISerializerDeserializer keySerde = dataFormat.getSerdeProvider().getSerializerDeserializer(keyType);
recordFields[i + 1] = keySerde;
}
return new RecordDescriptor(recordFields);
}
}