blob: 5a8d8e653e3658cc8beb47ac86226d1c2acb723b [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata;
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
import edu.uci.ics.asterix.common.transactions.DatasetId;
import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType;
import edu.uci.ics.asterix.common.transactions.ITransactionContext;
import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.metadata.api.IMetadataIndex;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
import edu.uci.ics.asterix.metadata.api.IValueExtractor;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes;
import edu.uci.ics.asterix.metadata.bootstrap.MetadataSecondaryIndexes;
import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Library;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.metadata.entitytupletranslators.CompactionPolicyTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasetTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.LibraryTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator;
import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator;
import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory;
import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.NestedDatatypeNameValueExtractor;
import edu.uci.ics.asterix.metadata.valueextractors.TupleCopyValueExtractor;
import edu.uci.ics.asterix.om.base.AInt32;
import edu.uci.ics.asterix.om.base.AMutableString;
import edu.uci.ics.asterix.om.base.AString;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator;
import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils;
import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate;
import edu.uci.ics.hyracks.storage.am.common.api.IIndex;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor;
import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException;
import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException;
import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
public class MetadataNode implements IMetadataNode {
private static final long serialVersionUID = 1L;
private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID);
private IIndexLifecycleManager indexLifecycleManager;
private ITransactionSubsystem transactionSubsystem;
public static final MetadataNode INSTANCE = new MetadataNode();
private MetadataNode() {
super();
}
public void initialize(IAsterixAppRuntimeContext runtimeContext) {
this.transactionSubsystem = runtimeContext.getTransactionSubsystem();
this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager();
}
@Override
public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().beginTransaction(transactionId);
txnCtx.setMetadataTransaction(true);
}
@Override
public void commitTransaction(JobId jobId) throws RemoteException, ACIDException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1);
}
@Override
public void abortTransaction(JobId jobId) throws RemoteException, ACIDException {
try {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId,
false);
transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1);
} catch (ACIDException e) {
e.printStackTrace();
throw e;
}
}
@Override
public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, lockMode, txnCtx);
}
@Override
public void addDataverse(JobId jobId, Dataverse dataverse) throws MetadataException, RemoteException {
try {
DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(dataverse);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A dataverse with this name " + dataverse.getDataverseName()
+ " already exists.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addDataset(JobId jobId, Dataset dataset) throws MetadataException, RemoteException {
try {
// Insert into the 'dataset' dataset.
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(true);
ITupleReference datasetTuple = tupleReaderWriter.getTupleFromMetadataEntity(dataset);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
// Add the primary index for the dataset.
InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails();
Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp());
addIndex(jobId, primaryIndex);
// Add an entry for the node group
ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
dataset.getDatasetName());
insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, nodeGroupTuple);
} else if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
//added for external data
ExternalDatasetDetails id = (ExternalDatasetDetails) dataset.getDatasetDetails();
ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(),
dataset.getDatasetName());
insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, nodeGroupTuple);
}
// Add entry in datatype secondary index.
ITupleReference dataTypeTuple = createTuple(dataset.getDataverseName(), dataset.getItemTypeName(),
dataset.getDatasetName());
insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeTuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A dataset with this name " + dataset.getDatasetName()
+ " already exists in dataverse '" + dataset.getDataverseName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addIndex(JobId jobId, Index index) throws MetadataException, RemoteException {
try {
IndexTupleTranslator tupleWriter = new IndexTupleTranslator(true);
ITupleReference tuple = tupleWriter.getTupleFromMetadataEntity(index);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, tuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("An index with name '" + index.getIndexName() + "' already exists.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addNode(JobId jobId, Node node) throws MetadataException, RemoteException {
try {
NodeTupleTranslator tupleReaderWriter = new NodeTupleTranslator(true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(node);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.NODE_DATASET, tuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A node with name '" + node.getNodeName() + "' already exists.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addNodeGroup(JobId jobId, NodeGroup nodeGroup) throws MetadataException, RemoteException {
try {
NodeGroupTupleTranslator tupleReaderWriter = new NodeGroupTupleTranslator(true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(nodeGroup);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A nodegroup with name '" + nodeGroup.getNodeGroupName() + "' already exists.",
e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addDatatype(JobId jobId, Datatype datatype) throws MetadataException, RemoteException {
try {
DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(datatype);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A datatype with name '" + datatype.getDatatypeName() + "' already exists.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addFunction(JobId jobId, Function function) throws MetadataException, RemoteException {
try {
// Insert into the 'function' dataset.
FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(true);
ITupleReference functionTuple = tupleReaderWriter.getTupleFromMetadataEntity(function);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A function with this name " + function.getName() + " and arity "
+ function.getArity() + " already exists in dataverse '" + function.getDataverseName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
public void insertIntoDatatypeSecondaryIndex(JobId jobId, String dataverseName, String nestedTypeName,
String topTypeName) throws Exception {
ITupleReference tuple = createTuple(dataverseName, nestedTypeName, topTypeName);
insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple);
}
private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
lsmIndex, IndexOperation.INSERT);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
// TODO: fix exceptions once new BTree exception model is in hyracks.
indexAccessor.forceInsert(tuple);
indexLifecycleManager.close(resourceID);
}
private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId,
IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
if (metadataIndex.isPrimaryIndex()) {
return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
} else {
return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp);
}
}
@Override
public void dropDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
try {
List<Dataset> dataverseDatasets;
Dataset ds;
dataverseDatasets = getDataverseDatasets(jobId, dataverseName);
if (dataverseDatasets != null && dataverseDatasets.size() > 0) {
// Drop all datasets in this dataverse.
for (int i = 0; i < dataverseDatasets.size(); i++) {
ds = dataverseDatasets.get(i);
dropDataset(jobId, dataverseName, ds.getDatasetName());
}
}
List<Datatype> dataverseDatatypes;
// As a side effect, acquires an S lock on the 'datatype' dataset
// on behalf of txnId.
dataverseDatatypes = getDataverseDatatypes(jobId, dataverseName);
if (dataverseDatatypes != null && dataverseDatatypes.size() > 0) {
// Drop all types in this dataverse.
for (int i = 0; i < dataverseDatatypes.size(); i++) {
forceDropDatatype(jobId, dataverseName, dataverseDatatypes.get(i).getDatatypeName());
}
}
// As a side effect, acquires an S lock on the 'Function' dataset
// on behalf of txnId.
List<Function> dataverseFunctions = getDataverseFunctions(jobId, dataverseName);
if (dataverseFunctions != null && dataverseFunctions.size() > 0) {
// Drop all functions in this dataverse.
for (Function function : dataverseFunctions) {
dropFunction(jobId, new FunctionSignature(dataverseName, function.getName(), function.getArity()));
}
}
// As a side effect, acquires an S lock on the 'Adapter' dataset
// on behalf of txnId.
List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(jobId, dataverseName);
if (dataverseAdapters != null && dataverseAdapters.size() > 0) {
// Drop all functions in this dataverse.
for (DatasourceAdapter adapter : dataverseAdapters) {
dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getAdapterName());
}
}
List<Feed> dataverseFeeds;
Feed feed;
dataverseFeeds = getDataverseFeeds(jobId, dataverseName);
if (dataverseFeeds != null && dataverseFeeds.size() > 0) {
// Drop all datasets in this dataverse.
for (int i = 0; i < dataverseFeeds.size(); i++) {
feed = dataverseFeeds.get(i);
dropFeed(jobId, dataverseName, feed.getFeedName());
}
}
// Delete the dataverse entry from the 'dataverse' dataset.
ITupleReference searchKey = createTuple(dataverseName);
// As a side effect, acquires an S lock on the 'dataverse' dataset
// on behalf of txnId.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop dataverse '" + dataverseName + "' because it doesn't exist.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
RemoteException {
Dataset dataset;
try {
dataset = getDataset(jobId, dataverseName, datasetName);
} catch (Exception e) {
throw new MetadataException(e);
}
if (dataset == null) {
throw new MetadataException("Cannot drop dataset '" + datasetName + "' because it doesn't exist.");
}
try {
// Delete entry from the 'datasets' dataset.
ITupleReference searchKey = createTuple(dataverseName, datasetName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'dataset' dataset.
try {
ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET,
searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
} catch (TreeIndexException tie) {
// ignore this exception and continue deleting all relevant
// artifacts.
}
// Delete entry from secondary index 'group'.
ITupleReference groupNameSearchKey = createTuple(dataset.getDatasetDetails().getNodeGroupName(),
dataverseName, datasetName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the GROUPNAME_ON_DATASET_INDEX index.
try {
ITupleReference groupNameTuple = getTupleToBeDeleted(jobId,
MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, groupNameSearchKey);
deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, groupNameTuple);
} catch (TreeIndexException tie) {
// ignore this exception and continue deleting all relevant
// artifacts.
}
// Delete entry from secondary index 'type'.
ITupleReference dataTypeSearchKey = createTuple(dataverseName, dataset.getItemTypeName(), datasetName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the DATATYPENAME_ON_DATASET_INDEX index.
try {
ITupleReference dataTypeTuple = getTupleToBeDeleted(jobId,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeSearchKey);
deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeTuple);
} catch (TreeIndexException tie) {
// ignore this exception and continue deleting all relevant
// artifacts.
}
// Delete entry(s) from the 'indexes' dataset.
List<Index> datasetIndexes = getDatasetIndexes(jobId, dataverseName, datasetName);
if (datasetIndexes != null) {
for (Index index : datasetIndexes) {
dropIndex(jobId, dataverseName, datasetName, index.getIndexName());
}
}
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
// Delete External Files
// As a side effect, acquires an S lock on the 'ExternalFile' dataset
// on behalf of txnId.
List<ExternalFile> datasetFiles = getExternalFiles(jobId, dataset);
if (datasetFiles != null && datasetFiles.size() > 0) {
// Drop all external files in this dataset.
for (ExternalFile file : datasetFiles) {
dropExternalFile(jobId, dataverseName, file.getDatasetName(), file.getFileNumber());
}
}
}
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropIndex(JobId jobId, String dataverseName, String datasetName, String indexName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'index' dataset.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, tuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop index '" + datasetName + "." + indexName
+ "' because it doesn't exist.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropNodegroup(JobId jobId, String nodeGroupName) throws MetadataException, RemoteException {
List<String> datasetNames;
try {
datasetNames = getDatasetNamesPartitionedOnThisNodeGroup(jobId, nodeGroupName);
} catch (Exception e) {
throw new MetadataException(e);
}
if (!datasetNames.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Nodegroup '" + nodeGroupName
+ "' cannot be dropped; it was used for partitioning these datasets:");
for (int i = 0; i < datasetNames.size(); i++)
sb.append("\n" + (i + 1) + "- " + datasetNames.get(i) + ".");
throw new MetadataException(sb.toString());
}
try {
ITupleReference searchKey = createTuple(nodeGroupName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'nodegroup' dataset.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop nodegroup '" + nodeGroupName + "' because it doesn't exist", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
RemoteException {
List<String> datasetNames;
List<String> usedDatatypes;
try {
datasetNames = getDatasetNamesDeclaredByThisDatatype(jobId, dataverseName, datatypeName);
usedDatatypes = getDatatypeNamesUsingThisDatatype(jobId, dataverseName, datatypeName);
} catch (Exception e) {
throw new MetadataException(e);
}
// Check whether type is being used by datasets.
if (!datasetNames.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Cannot drop type '" + datatypeName + "'; it was used when creating these datasets:");
for (int i = 0; i < datasetNames.size(); i++)
sb.append("\n" + (i + 1) + "- " + datasetNames.get(i) + ".");
throw new MetadataException(sb.toString());
}
// Check whether type is being used by other types.
if (!usedDatatypes.isEmpty()) {
StringBuilder sb = new StringBuilder();
sb.append("Cannot drop type '" + datatypeName + "'; it is used in these datatypes:");
for (int i = 0; i < usedDatatypes.size(); i++)
sb.append("\n" + (i + 1) + "- " + usedDatatypes.get(i) + ".");
throw new MetadataException(sb.toString());
}
// Delete the datatype entry, including all it's nested types.
try {
ITupleReference searchKey = createTuple(dataverseName, datatypeName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'datatype' dataset.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey);
// This call uses the secondary index on datatype. Get nested types
// before deleting entry from secondary index.
List<String> nestedTypes = getNestedDatatypeNames(jobId, dataverseName, datatypeName);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple);
deleteFromDatatypeSecondaryIndex(jobId, dataverseName, datatypeName);
for (String nestedType : nestedTypes) {
Datatype dt = getDatatype(jobId, dataverseName, nestedType);
if (dt != null && dt.getIsAnonymous()) {
dropDatatype(jobId, dataverseName, dt.getDatatypeName());
}
}
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
private void forceDropDatatype(JobId jobId, String dataverseName, String datatypeName) throws AsterixException {
try {
ITupleReference searchKey = createTuple(dataverseName, datatypeName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'datatype' dataset.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple);
deleteFromDatatypeSecondaryIndex(jobId, dataverseName, datatypeName);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new AsterixException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e);
} catch (AsterixException e) {
throw e;
} catch (Exception e) {
throw new AsterixException(e);
}
}
private void deleteFromDatatypeSecondaryIndex(JobId jobId, String dataverseName, String datatypeName)
throws AsterixException {
try {
List<String> nestedTypes = getNestedDatatypeNames(jobId, dataverseName, datatypeName);
for (String nestedType : nestedTypes) {
ITupleReference searchKey = createTuple(dataverseName, nestedType, datatypeName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the DATATYPENAME_ON_DATATYPE_INDEX index.
ITupleReference tuple = getTupleToBeDeleted(jobId,
MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey);
deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple);
}
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new AsterixException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e);
} catch (AsterixException e) {
throw e;
} catch (Exception e) {
throw new AsterixException(e);
}
}
private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws Exception {
long resourceID = metadataIndex.getResourceID();
ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
// prepare a Callback for logging
IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex,
lsmIndex, IndexOperation.DELETE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
txnCtx.setWriteTxn(true);
txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback,
metadataIndex.isPrimaryIndex());
indexAccessor.forceDelete(tuple);
indexLifecycleManager.close(resourceID);
}
@Override
public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException {
try {
DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter);
List<Dataverse> results = new ArrayList<Dataverse>();
searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, null, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false);
IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter);
List<Dataverse> results = new ArrayList<Dataverse>();
searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false);
IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter);
List<Dataset> results = new ArrayList<Dataset>();
searchIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<Feed> getDataverseFeeds(JobId jobId, String dataverseName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(false);
IValueExtractor<Feed> valueExtractor = new MetadataEntityValueExtractor<Feed>(tupleReaderWriter);
List<Feed> results = new ArrayList<Feed>();
searchIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
public List<Library> getDataverseLibraries(JobId jobId, String dataverseName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(false);
IValueExtractor<Library> valueExtractor = new MetadataEntityValueExtractor<Library>(tupleReaderWriter);
List<Library> results = new ArrayList<Library>();
searchIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<Datatype>(tupleReaderWriter);
List<Datatype> results = new ArrayList<Datatype>();
searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Dataset getDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datasetName);
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false);
List<Dataset> results = new ArrayList<Dataset>();
IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
private List<String> getDatasetNamesDeclaredByThisDatatype(JobId jobId, String dataverseName, String datatypeName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datatypeName);
List<String> results = new ArrayList<String>();
IValueExtractor<String> valueExtractor = new DatasetNameValueExtractor();
searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, searchKey, valueExtractor,
results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
public List<String> getDatatypeNamesUsingThisDatatype(JobId jobId, String dataverseName, String datatypeName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datatypeName);
List<String> results = new ArrayList<String>();
IValueExtractor<String> valueExtractor = new DatatypeNameValueExtractor(dataverseName, this);
searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey, valueExtractor,
results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
private List<String> getNestedDatatypeNames(JobId jobId, String dataverseName, String datatypeName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
List<String> results = new ArrayList<String>();
IValueExtractor<String> valueExtractor = new NestedDatatypeNameValueExtractor(datatypeName);
searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey, valueExtractor,
results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
public List<String> getDatasetNamesPartitionedOnThisNodeGroup(JobId jobId, String nodegroup)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(nodegroup);
List<String> results = new ArrayList<String>();
IValueExtractor<String> valueExtractor = new DatasetNameValueExtractor();
searchIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Index getIndex(JobId jobId, String dataverseName, String datasetName, String indexName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName);
IndexTupleTranslator tupleReaderWriter = new IndexTupleTranslator(false);
IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<Index>(tupleReaderWriter);
List<Index> results = new ArrayList<Index>();
searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<Index> getDatasetIndexes(JobId jobId, String dataverseName, String datasetName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datasetName);
IndexTupleTranslator tupleReaderWriter = new IndexTupleTranslator(false);
IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<Index>(tupleReaderWriter);
List<Index> results = new ArrayList<Index>();
searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, datatypeName);
DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false);
IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<Datatype>(tupleReaderWriter);
List<Datatype> results = new ArrayList<Datatype>();
searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
e.printStackTrace();
throw new MetadataException(e);
}
}
@Override
public NodeGroup getNodeGroup(JobId jobId, String nodeGroupName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(nodeGroupName);
NodeGroupTupleTranslator tupleReaderWriter = new NodeGroupTupleTranslator(false);
IValueExtractor<NodeGroup> valueExtractor = new MetadataEntityValueExtractor<NodeGroup>(tupleReaderWriter);
List<NodeGroup> results = new ArrayList<NodeGroup>();
searchIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Function getFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(functionSignature.getNamespace(), functionSignature.getName(), ""
+ functionSignature.getArity());
FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(false);
List<Function> results = new ArrayList<Function>();
IValueExtractor<Function> valueExtractor = new MetadataEntityValueExtractor<Function>(tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
e.printStackTrace();
throw new MetadataException(e);
}
}
@Override
public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
RemoteException {
Function function;
try {
function = getFunction(jobId, functionSignature);
} catch (Exception e) {
throw new MetadataException(e);
}
if (function == null) {
throw new MetadataException("Cannot drop function '" + functionSignature.toString()
+ "' because it doesn't exist.");
}
try {
// Delete entry from the 'function' dataset.
ITupleReference searchKey = createTuple(functionSignature.getNamespace(), functionSignature.getName(), ""
+ functionSignature.getArity());
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'function' dataset.
ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET,
searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("There is no function with the name " + functionSignature.getName()
+ " and arity " + functionSignature.getArity(), e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
private ITupleReference getTupleToBeDeleted(JobId jobId, IMetadataIndex metadataIndex, ITupleReference searchKey)
throws Exception {
IValueExtractor<ITupleReference> valueExtractor = new TupleCopyValueExtractor(metadataIndex.getTypeTraits());
List<ITupleReference> results = new ArrayList<ITupleReference>();
searchIndex(jobId, metadataIndex, searchKey, valueExtractor, results);
if (results.isEmpty()) {
// TODO: Temporarily a TreeIndexException to make it get caught by
// caller in the appropriate catch block.
throw new TreeIndexException("Could not find entry to be deleted.");
}
// There should be exactly one result returned from the search.
return results.get(0);
}
// Debugging Method
public String printMetadata() {
StringBuilder sb = new StringBuilder();
try {
IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET;
long resourceID = index.getResourceID();
IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
RangePredicate rangePred = null;
rangePred = new RangePredicate(null, null, true, true, null, null);
indexAccessor.search(rangeCursor, rangePred);
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
new ISerializerDeserializer[] { AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING) }));
}
} finally {
rangeCursor.close();
}
indexLifecycleManager.close(resourceID);
index = MetadataPrimaryIndexes.DATASET_DATASET;
resourceID = index.getResourceID();
indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
rangePred = null;
rangePred = new RangePredicate(null, null, true, true, null, null);
indexAccessor.search(rangeCursor, rangePred);
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
}
} finally {
rangeCursor.close();
}
indexLifecycleManager.close(resourceID);
index = MetadataPrimaryIndexes.INDEX_DATASET;
resourceID = index.getResourceID();
indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
indexAccessor = indexInstance
.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
rangePred = null;
rangePred = new RangePredicate(null, null, true, true, null, null);
indexAccessor.search(rangeCursor, rangePred);
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) }));
}
} finally {
rangeCursor.close();
}
indexLifecycleManager.close(resourceID);
} catch (Exception e) {
e.printStackTrace();
}
return sb.toString();
}
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
long resourceID = index.getResourceID();
IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false);
IBinaryComparator[] searchCmps = null;
MultiComparator searchCmp = null;
RangePredicate rangePred = null;
if (searchKey != null) {
searchCmps = new IBinaryComparator[searchKey.getFieldCount()];
for (int i = 0; i < searchKey.getFieldCount(); i++) {
searchCmps[i] = comparatorFactories[i].createBinaryComparator();
}
searchCmp = new MultiComparator(searchCmps);
}
rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp);
indexAccessor.search(rangeCursor, rangePred);
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
ResultType result = valueExtractor.getValue(jobId, rangeCursor.getTuple());
if (result != null) {
results.add(result);
}
}
} finally {
rangeCursor.close();
}
indexLifecycleManager.close(resourceID);
}
@Override
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID;
long resourceID = MetadataPrimaryIndexes.DATASET_DATASET.getResourceID();
try {
IIndex indexInstance = indexLifecycleManager.getIndex(resourceID);
indexLifecycleManager.open(resourceID);
try {
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
NoOpOperationCallback.INSTANCE);
IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false);
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false);
IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter);
RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null);
indexAccessor.search(rangeCursor, rangePred);
int datasetId;
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
final ITupleReference ref = rangeCursor.getTuple();
final Dataset ds = valueExtractor.getValue(jobId, ref);
datasetId = ds.getDatasetId();
if (mostRecentDatasetId < datasetId) {
mostRecentDatasetId = datasetId;
}
}
} finally {
rangeCursor.close();
}
} finally {
indexLifecycleManager.close(resourceID);
}
} catch (Exception e) {
throw new MetadataException(e);
}
DatasetIdFactory.initialize(mostRecentDatasetId);
}
// TODO: Can use Hyrack's TupleUtils for this, once we switch to a newer
// Hyracks version.
public ITupleReference createTuple(String... fields) throws HyracksDataException {
ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING);
AMutableString aString = new AMutableString("");
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fields.length);
for (String s : fields) {
aString.setValue(s);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
}
ArrayTupleReference tuple = new ArrayTupleReference();
tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
return tuple;
}
@Override
public List<Function> getDataverseFunctions(JobId jobId, String dataverseName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(false);
IValueExtractor<Function> valueExtractor = new MetadataEntityValueExtractor<Function>(tupleReaderWriter);
List<Function> results = new ArrayList<Function>();
searchIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException {
try {
// Insert into the 'Adapter' dataset.
DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(true);
ITupleReference adapterTuple = tupleReaderWriter.getTupleFromMetadataEntity(adapter);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getAdapterName()
+ " already exists in dataverse '" + adapter.getAdapterIdentifier().getNamespace() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropAdapter(JobId jobId, String dataverseName, String adapterName) throws MetadataException,
RemoteException {
DatasourceAdapter adapter;
try {
adapter = getAdapter(jobId, dataverseName, adapterName);
} catch (Exception e) {
throw new MetadataException(e);
}
if (adapter == null) {
throw new MetadataException("Cannot drop adapter '" + adapter + "' because it doesn't exist.");
}
try {
// Delete entry from the 'Adapter' dataset.
ITupleReference searchKey = createTuple(dataverseName, adapterName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'Adapter' dataset.
ITupleReference datasetTuple = getTupleToBeDeleted(jobId,
MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, datasetTuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop adapter '" + adapterName, e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public DatasourceAdapter getAdapter(JobId jobId, String dataverseName, String adapterName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, adapterName);
DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(false);
List<DatasourceAdapter> results = new ArrayList<DatasourceAdapter>();
IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<DatasourceAdapter>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException,
RemoteException {
try {
// Insert into the 'CompactionPolicy' dataset.
CompactionPolicyTupleTranslator tupleReaderWriter = new CompactionPolicyTupleTranslator(true);
ITupleReference compactionPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(compactionPolicy);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, compactionPolicyTuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("A compcation policy with this name " + compactionPolicy.getPolicyName()
+ " already exists in dataverse '" + compactionPolicy.getPolicyName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policyName)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverse, policyName);
CompactionPolicyTupleTranslator tupleReaderWriter = new CompactionPolicyTupleTranslator(false);
List<CompactionPolicy> results = new ArrayList<CompactionPolicy>();
IValueExtractor<CompactionPolicy> valueExtractor = new MetadataEntityValueExtractor<CompactionPolicy>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
return results.get(0);
}
return null;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName);
DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(false);
IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<DatasourceAdapter>(
tupleReaderWriter);
List<DatasourceAdapter> results = new ArrayList<DatasourceAdapter>();
searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addLibrary(JobId jobId, Library library) throws MetadataException, RemoteException {
try {
// Insert into the 'Library' dataset.
LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(true);
ITupleReference libraryTuple = tupleReaderWriter.getTupleFromMetadataEntity(library);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple);
} catch (TreeIndexException e) {
throw new MetadataException("A library with this name " + library.getDataverseName()
+ " already exists in dataverse '" + library.getDataverseName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
RemoteException {
Library library;
try {
library = getLibrary(jobId, dataverseName, libraryName);
} catch (Exception e) {
throw new MetadataException(e);
}
if (library == null) {
throw new MetadataException("Cannot drop library '" + library + "' because it doesn't exist.");
}
try {
// Delete entry from the 'Library' dataset.
ITupleReference searchKey = createTuple(dataverseName, libraryName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'Adapter' dataset.
ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, datasetTuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop library '" + libraryName, e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Library getLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverseName, libraryName);
LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(false);
List<Library> results = new ArrayList<Library>();
IValueExtractor<Library> valueExtractor = new MetadataEntityValueExtractor<Library>(tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
public int getMostRecentDatasetId() throws MetadataException, RemoteException {
return DatasetIdFactory.getMostRecentDatasetId();
}
@Override
public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity)
throws MetadataException, RemoteException {
try {
if (!FeedActivityIdFactory.isInitialized()) {
initializeFeedActivityIdFactory(jobId);
}
feedActivity.setActivityId(FeedActivityIdFactory.generateFeedActivityId());
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(feedActivity);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, tuple);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivityType... activityType)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getFeedName(),
feedId.getDatasetName());
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
List<FeedActivity> results = new ArrayList<FeedActivity>();
IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
Collections.sort(results);
if (activityType == null) {
return results.get(0);
} else {
for (FeedActivity result : results) {
for (FeedActivityType ft : activityType) {
if (result.getActivityType().equals(ft)) {
return result;
}
}
}
}
}
return null;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple();
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
List<FeedActivity> results = new ArrayList<FeedActivity>();
IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
int maxActivityId = 0;
for (FeedActivity fa : results) {
if (maxActivityId < fa.getActivityId()) {
maxActivityId = fa.getActivityId();
}
}
FeedActivityIdFactory.initialize(maxActivityId);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException {
try {
// Insert into the 'FeedPolicy' dataset.
FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(true);
ITupleReference feedPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedPolicy);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, feedPolicyTuple);
} catch (TreeIndexException e) {
throw new MetadataException("A feed policy with this name " + feedPolicy.getPolicyName()
+ " already exists in dataverse '" + feedPolicy.getPolicyName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policyName) throws MetadataException,
RemoteException {
try {
ITupleReference searchKey = createTuple(dataverse, policyName);
FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false);
List<FeedPolicy> results = new ArrayList<FeedPolicy>();
IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
return results.get(0);
}
return null;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException,
RemoteException {
List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>();
Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>();
boolean invalidArgs = (dataverse == null && dataset != null);
if (invalidArgs) {
throw new MetadataException("Invalid arguments " + dataverse + " " + dataset);
}
try {
ITupleReference searchKey = createTuple();
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true);
List<FeedActivity> results = new ArrayList<FeedActivity>();
IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
Collections.sort(results); // recent activity first
FeedConnectionId fid = null;
Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>();
for (FeedActivity fa : results) {
if (dataverse != null) {
if (dataset != null
&& (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) {
continue;
}
}
fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName());
switch (fa.getActivityType()) {
case FEED_BEGIN:
if (!terminatedFeeds.contains(fid)) {
if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) {
aFeeds.put(fid, fa);
}
}
break;
case FEED_END:
terminatedFeeds.add(fid);
break;
default: //ignore
}
}
for (FeedActivity f : aFeeds.values()) {
System.out.println("ACTIVE FEEDS " + f.getFeedName());
activeFeeds.add(f);
}
return activeFeeds;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException {
try {
// Insert into the 'Feed' dataset.
FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(true);
ITupleReference feedTuple = tupleReaderWriter.getTupleFromMetadataEntity(feed);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, feedTuple);
} catch (TreeIndexException e) {
throw new MetadataException("A feed with this name " + feed.getFeedName()
+ " already exists in dataverse '" + feed.getDataverseName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public Feed getFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverse, feedName);
FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(false);
List<Feed> results = new ArrayList<Feed>();
IValueExtractor<Feed> valueExtractor = new MetadataEntityValueExtractor<Feed>(tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
return results.get(0);
}
return null;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataverse, feedName);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'nodegroup' dataset.
ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, tuple);
// TODO: Change this to be a BTree specific exception, e.g.,
// BTreeKeyDoesNotExistException.
} catch (TreeIndexException e) {
throw new MetadataException("Cannot drop feed '" + feedName + "' because it doesn't exist", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverse, String feedName)
throws MetadataException, RemoteException {
List<FeedActivity> feedActivities = new ArrayList<FeedActivity>();
try {
ITupleReference searchKey = createTuple(dataverse, feedName);
FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false);
List<FeedActivity> results = new ArrayList<FeedActivity>();
IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>(
tupleReaderWriter);
searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results);
if (!results.isEmpty()) {
Collections.sort(results); // most recent feed activity
Set<String> terminatedDatasets = new HashSet<String>();
Set<String> activeDatasets = new HashSet<String>();
for (FeedActivity result : results) {
switch (result.getFeedActivityType()) {
case FEED_BEGIN:
if (!terminatedDatasets.contains(result.getDatasetName())) {
feedActivities.add(result);
activeDatasets.add(result.getDatasetName());
}
break;
case FEED_END:
if (!activeDatasets.contains(result.getDatasetName())) {
terminatedDatasets.add(result.getDatasetName());
}
break;
}
}
}
return feedActivities;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException {
try {
// Insert into the 'externalFiles' dataset.
ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true);
ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple);
} catch (TreeIndexDuplicateKeyException e) {
throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber()
+ " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '"
+ externalFile.getDataverseName() + "'.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public List<ExternalFile> getExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName());
ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
tupleReaderWriter);
List<ExternalFile> results = new ArrayList<ExternalFile>();
searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
return results;
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropExternalFile(JobId jobId, String dataverseName, String datasetName, int fileNumber)
throws MetadataException, RemoteException {
try {
// Delete entry from the 'ExternalFile' dataset.
ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'ExternalFile' dataset.
ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET,
searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple);
} catch (TreeIndexException e) {
throw new MetadataException("Couldn't drop externalFile.", e);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void dropExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException {
List<ExternalFile> files;
try {
files = getExternalFiles(jobId, dataset);
} catch (Exception e) {
throw new MetadataException(e);
}
try {
//loop through files and delete them
for (int i = 0; i < files.size(); i++) {
dropExternalFile(jobId, files.get(i).getDataverseName(), files.get(i).getDatasetName(), files.get(i)
.getFileNumber());
}
} catch (Exception e) {
throw new MetadataException(e);
}
}
// This method is used to create a search tuple for external data file since the search tuple has an int value
@SuppressWarnings("unchecked")
public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber)
throws HyracksDataException {
ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING);
ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT32);
AMutableString aString = new AMutableString("");
ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3);
//dataverse field
aString.setValue(dataverseName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
//dataset field
aString.setValue(datasetName);
stringSerde.serialize(aString, tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
//file number field
intSerde.serialize(new AInt32(fileNumber), tupleBuilder.getDataOutput());
tupleBuilder.addFieldEndOffset();
ArrayTupleReference tuple = new ArrayTupleReference();
tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray());
return tuple;
}
@Override
public ExternalFile getExternalFile(JobId jobId, String dataverseName, String datasetName, Integer fileNumber)
throws MetadataException, RemoteException {
try {
ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber);
ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false);
IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>(
tupleReaderWriter);
List<ExternalFile> results = new ArrayList<ExternalFile>();
searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results);
if (results.isEmpty()) {
return null;
}
return results.get(0);
} catch (Exception e) {
throw new MetadataException(e);
}
}
@Override
public void updateDataset(JobId jobId, Dataset dataset) throws MetadataException, RemoteException {
try {
// This method will delete previous entry of the dataset and insert the new one
// Delete entry from the 'datasets' dataset.
ITupleReference searchKey;
searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName());
// Searches the index for the tuple to be deleted. Acquires an S
// lock on the 'dataset' dataset.
ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey);
deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
// Previous tuple was deleted
// Insert into the 'dataset' dataset.
DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(true);
datasetTuple = tupleReaderWriter.getTupleFromMetadataEntity(dataset);
insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple);
} catch (Exception e) {
throw new MetadataException(e);
}
}
}