| /* |
| * Copyright 2009-2013 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edu.uci.ics.asterix.metadata; |
| |
| import java.rmi.RemoteException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import edu.uci.ics.asterix.common.api.IAsterixAppRuntimeContext; |
| import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType; |
| import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType; |
| import edu.uci.ics.asterix.common.exceptions.ACIDException; |
| import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| import edu.uci.ics.asterix.common.feeds.FeedConnectionId; |
| import edu.uci.ics.asterix.common.functions.FunctionSignature; |
| import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback; |
| import edu.uci.ics.asterix.common.transactions.DatasetId; |
| import edu.uci.ics.asterix.common.transactions.IRecoveryManager.ResourceType; |
| import edu.uci.ics.asterix.common.transactions.ITransactionContext; |
| import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem; |
| import edu.uci.ics.asterix.common.transactions.JobId; |
| import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider; |
| import edu.uci.ics.asterix.metadata.api.IMetadataIndex; |
| import edu.uci.ics.asterix.metadata.api.IMetadataNode; |
| import edu.uci.ics.asterix.metadata.api.IValueExtractor; |
| import edu.uci.ics.asterix.metadata.bootstrap.MetadataPrimaryIndexes; |
| import edu.uci.ics.asterix.metadata.bootstrap.MetadataSecondaryIndexes; |
| import edu.uci.ics.asterix.metadata.entities.CompactionPolicy; |
| import edu.uci.ics.asterix.metadata.entities.Dataset; |
| import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter; |
| import edu.uci.ics.asterix.metadata.entities.Datatype; |
| import edu.uci.ics.asterix.metadata.entities.Dataverse; |
| import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails; |
| import edu.uci.ics.asterix.metadata.entities.ExternalFile; |
| import edu.uci.ics.asterix.metadata.entities.Feed; |
| import edu.uci.ics.asterix.metadata.entities.FeedActivity; |
| import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType; |
| import edu.uci.ics.asterix.metadata.entities.FeedPolicy; |
| import edu.uci.ics.asterix.metadata.entities.Function; |
| import edu.uci.ics.asterix.metadata.entities.Index; |
| import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails; |
| import edu.uci.ics.asterix.metadata.entities.Library; |
| import edu.uci.ics.asterix.metadata.entities.Node; |
| import edu.uci.ics.asterix.metadata.entities.NodeGroup; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.CompactionPolicyTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasetTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.DatasourceAdapterTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.DatatypeTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.DataverseTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.ExternalFileTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedActivityTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedPolicyTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.FeedTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.FunctionTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.IndexTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.LibraryTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeGroupTupleTranslator; |
| import edu.uci.ics.asterix.metadata.entitytupletranslators.NodeTupleTranslator; |
| import edu.uci.ics.asterix.metadata.feeds.FeedActivityIdFactory; |
| import edu.uci.ics.asterix.metadata.valueextractors.DatasetNameValueExtractor; |
| import edu.uci.ics.asterix.metadata.valueextractors.DatatypeNameValueExtractor; |
| import edu.uci.ics.asterix.metadata.valueextractors.MetadataEntityValueExtractor; |
| import edu.uci.ics.asterix.metadata.valueextractors.NestedDatatypeNameValueExtractor; |
| import edu.uci.ics.asterix.metadata.valueextractors.TupleCopyValueExtractor; |
| import edu.uci.ics.asterix.om.base.AInt32; |
| import edu.uci.ics.asterix.om.base.AMutableString; |
| import edu.uci.ics.asterix.om.base.AString; |
| import edu.uci.ics.asterix.om.types.BuiltinType; |
| import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback; |
| import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetIdFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparator; |
| import edu.uci.ics.hyracks.api.dataflow.value.IBinaryComparatorFactory; |
| import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer; |
| import edu.uci.ics.hyracks.api.exceptions.HyracksDataException; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleBuilder; |
| import edu.uci.ics.hyracks.dataflow.common.comm.io.ArrayTupleReference; |
| import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference; |
| import edu.uci.ics.hyracks.dataflow.common.util.TupleUtils; |
| import edu.uci.ics.hyracks.storage.am.btree.impls.RangePredicate; |
| import edu.uci.ics.hyracks.storage.am.common.api.IIndex; |
| import edu.uci.ics.hyracks.storage.am.common.api.IIndexAccessor; |
| import edu.uci.ics.hyracks.storage.am.common.api.IIndexCursor; |
| import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager; |
| import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback; |
| import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexCursor; |
| import edu.uci.ics.hyracks.storage.am.common.api.TreeIndexException; |
| import edu.uci.ics.hyracks.storage.am.common.exceptions.TreeIndexDuplicateKeyException; |
| import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallback; |
| import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation; |
| import edu.uci.ics.hyracks.storage.am.common.ophelpers.MultiComparator; |
| import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex; |
| import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; |
| |
| public class MetadataNode implements IMetadataNode { |
| private static final long serialVersionUID = 1L; |
| |
| private static final DatasetId METADATA_DATASET_ID = new DatasetId(MetadataPrimaryIndexes.METADATA_DATASET_ID); |
| |
| private IIndexLifecycleManager indexLifecycleManager; |
| private ITransactionSubsystem transactionSubsystem; |
| |
| public static final MetadataNode INSTANCE = new MetadataNode(); |
| |
| private MetadataNode() { |
| super(); |
| } |
| |
| public void initialize(IAsterixAppRuntimeContext runtimeContext) { |
| this.transactionSubsystem = runtimeContext.getTransactionSubsystem(); |
| this.indexLifecycleManager = runtimeContext.getIndexLifecycleManager(); |
| } |
| |
| @Override |
| public void beginTransaction(JobId transactionId) throws ACIDException, RemoteException { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().beginTransaction(transactionId); |
| txnCtx.setMetadataTransaction(true); |
| } |
| |
| @Override |
| public void commitTransaction(JobId jobId) throws RemoteException, ACIDException { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| transactionSubsystem.getTransactionManager().commitTransaction(txnCtx, new DatasetId(-1), -1); |
| } |
| |
| @Override |
| public void abortTransaction(JobId jobId) throws RemoteException, ACIDException { |
| try { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, |
| false); |
| transactionSubsystem.getTransactionManager().abortTransaction(txnCtx, new DatasetId(-1), -1); |
| } catch (ACIDException e) { |
| e.printStackTrace(); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void lock(JobId jobId, byte lockMode) throws ACIDException, RemoteException { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| transactionSubsystem.getLockManager().lock(METADATA_DATASET_ID, -1, lockMode, txnCtx); |
| } |
| |
| @Override |
| public void unlock(JobId jobId, byte lockMode) throws ACIDException, RemoteException { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| transactionSubsystem.getLockManager().unlock(METADATA_DATASET_ID, -1, lockMode, txnCtx); |
| } |
| |
| @Override |
| public void addDataverse(JobId jobId, Dataverse dataverse) throws MetadataException, RemoteException { |
| try { |
| DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(true); |
| ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(dataverse); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A dataverse with this name " + dataverse.getDataverseName() |
| + " already exists.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addDataset(JobId jobId, Dataset dataset) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'dataset' dataset. |
| DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(true); |
| ITupleReference datasetTuple = tupleReaderWriter.getTupleFromMetadataEntity(dataset); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple); |
| if (dataset.getDatasetType() == DatasetType.INTERNAL) { |
| // Add the primary index for the dataset. |
| InternalDatasetDetails id = (InternalDatasetDetails) dataset.getDatasetDetails(); |
| Index primaryIndex = new Index(dataset.getDataverseName(), dataset.getDatasetName(), |
| dataset.getDatasetName(), IndexType.BTREE, id.getPrimaryKey(), true, dataset.getPendingOp()); |
| |
| addIndex(jobId, primaryIndex); |
| // Add an entry for the node group |
| ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(), |
| dataset.getDatasetName()); |
| insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, nodeGroupTuple); |
| } else if (dataset.getDatasetType() == DatasetType.EXTERNAL) { |
| //added for external data |
| ExternalDatasetDetails id = (ExternalDatasetDetails) dataset.getDatasetDetails(); |
| ITupleReference nodeGroupTuple = createTuple(id.getNodeGroupName(), dataset.getDataverseName(), |
| dataset.getDatasetName()); |
| insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, nodeGroupTuple); |
| } |
| // Add entry in datatype secondary index. |
| ITupleReference dataTypeTuple = createTuple(dataset.getDataverseName(), dataset.getItemTypeName(), |
| dataset.getDatasetName()); |
| insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeTuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A dataset with this name " + dataset.getDatasetName() |
| + " already exists in dataverse '" + dataset.getDataverseName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addIndex(JobId jobId, Index index) throws MetadataException, RemoteException { |
| try { |
| IndexTupleTranslator tupleWriter = new IndexTupleTranslator(true); |
| ITupleReference tuple = tupleWriter.getTupleFromMetadataEntity(index); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, tuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("An index with name '" + index.getIndexName() + "' already exists.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addNode(JobId jobId, Node node) throws MetadataException, RemoteException { |
| try { |
| NodeTupleTranslator tupleReaderWriter = new NodeTupleTranslator(true); |
| ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(node); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.NODE_DATASET, tuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A node with name '" + node.getNodeName() + "' already exists.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addNodeGroup(JobId jobId, NodeGroup nodeGroup) throws MetadataException, RemoteException { |
| try { |
| NodeGroupTupleTranslator tupleReaderWriter = new NodeGroupTupleTranslator(true); |
| ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(nodeGroup); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A nodegroup with name '" + nodeGroup.getNodeGroupName() + "' already exists.", |
| e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addDatatype(JobId jobId, Datatype datatype) throws MetadataException, RemoteException { |
| try { |
| DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, true); |
| ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(datatype); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A datatype with name '" + datatype.getDatatypeName() + "' already exists.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addFunction(JobId jobId, Function function) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'function' dataset. |
| FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(true); |
| ITupleReference functionTuple = tupleReaderWriter.getTupleFromMetadataEntity(function); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple); |
| |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A function with this name " + function.getName() + " and arity " |
| + function.getArity() + " already exists in dataverse '" + function.getDataverseName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| public void insertIntoDatatypeSecondaryIndex(JobId jobId, String dataverseName, String nestedTypeName, |
| String topTypeName) throws Exception { |
| ITupleReference tuple = createTuple(dataverseName, nestedTypeName, topTypeName); |
| insertTupleIntoIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple); |
| } |
| |
| private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple) |
| throws Exception { |
| long resourceID = metadataIndex.getResourceID(); |
| ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| |
| // prepare a Callback for logging |
| IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex, |
| lsmIndex, IndexOperation.INSERT); |
| |
| ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); |
| |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| txnCtx.setWriteTxn(true); |
| txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback, |
| metadataIndex.isPrimaryIndex()); |
| |
| // TODO: fix exceptions once new BTree exception model is in hyracks. |
| indexAccessor.forceInsert(tuple); |
| |
| indexLifecycleManager.close(resourceID); |
| } |
| |
| private IModificationOperationCallback createIndexModificationCallback(JobId jobId, long resourceId, |
| IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws Exception { |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| |
| if (metadataIndex.isPrimaryIndex()) { |
| return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(), |
| metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(), |
| transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp); |
| } else { |
| return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(), |
| metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(), |
| transactionSubsystem, resourceId, ResourceType.LSM_BTREE, indexOp); |
| } |
| } |
| |
| @Override |
| public void dropDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException { |
| try { |
| |
| List<Dataset> dataverseDatasets; |
| Dataset ds; |
| dataverseDatasets = getDataverseDatasets(jobId, dataverseName); |
| if (dataverseDatasets != null && dataverseDatasets.size() > 0) { |
| // Drop all datasets in this dataverse. |
| for (int i = 0; i < dataverseDatasets.size(); i++) { |
| ds = dataverseDatasets.get(i); |
| dropDataset(jobId, dataverseName, ds.getDatasetName()); |
| } |
| } |
| List<Datatype> dataverseDatatypes; |
| // As a side effect, acquires an S lock on the 'datatype' dataset |
| // on behalf of txnId. |
| dataverseDatatypes = getDataverseDatatypes(jobId, dataverseName); |
| if (dataverseDatatypes != null && dataverseDatatypes.size() > 0) { |
| // Drop all types in this dataverse. |
| for (int i = 0; i < dataverseDatatypes.size(); i++) { |
| forceDropDatatype(jobId, dataverseName, dataverseDatatypes.get(i).getDatatypeName()); |
| } |
| } |
| |
| // As a side effect, acquires an S lock on the 'Function' dataset |
| // on behalf of txnId. |
| List<Function> dataverseFunctions = getDataverseFunctions(jobId, dataverseName); |
| if (dataverseFunctions != null && dataverseFunctions.size() > 0) { |
| // Drop all functions in this dataverse. |
| for (Function function : dataverseFunctions) { |
| dropFunction(jobId, new FunctionSignature(dataverseName, function.getName(), function.getArity())); |
| } |
| } |
| |
| // As a side effect, acquires an S lock on the 'Adapter' dataset |
| // on behalf of txnId. |
| List<DatasourceAdapter> dataverseAdapters = getDataverseAdapters(jobId, dataverseName); |
| if (dataverseAdapters != null && dataverseAdapters.size() > 0) { |
| // Drop all functions in this dataverse. |
| for (DatasourceAdapter adapter : dataverseAdapters) { |
| dropAdapter(jobId, dataverseName, adapter.getAdapterIdentifier().getAdapterName()); |
| } |
| } |
| |
| List<Feed> dataverseFeeds; |
| Feed feed; |
| dataverseFeeds = getDataverseFeeds(jobId, dataverseName); |
| if (dataverseFeeds != null && dataverseFeeds.size() > 0) { |
| // Drop all datasets in this dataverse. |
| for (int i = 0; i < dataverseFeeds.size(); i++) { |
| feed = dataverseFeeds.get(i); |
| dropFeed(jobId, dataverseName, feed.getFeedName()); |
| } |
| } |
| |
| // Delete the dataverse entry from the 'dataverse' dataset. |
| ITupleReference searchKey = createTuple(dataverseName); |
| // As a side effect, acquires an S lock on the 'dataverse' dataset |
| // on behalf of txnId. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, tuple); |
| |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop dataverse '" + dataverseName + "' because it doesn't exist.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException, |
| RemoteException { |
| Dataset dataset; |
| try { |
| dataset = getDataset(jobId, dataverseName, datasetName); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| if (dataset == null) { |
| throw new MetadataException("Cannot drop dataset '" + datasetName + "' because it doesn't exist."); |
| } |
| try { |
| // Delete entry from the 'datasets' dataset. |
| ITupleReference searchKey = createTuple(dataverseName, datasetName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'dataset' dataset. |
| try { |
| ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, |
| searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple); |
| } catch (TreeIndexException tie) { |
| // ignore this exception and continue deleting all relevant |
| // artifacts. |
| } |
| |
| // Delete entry from secondary index 'group'. |
| ITupleReference groupNameSearchKey = createTuple(dataset.getDatasetDetails().getNodeGroupName(), |
| dataverseName, datasetName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the GROUPNAME_ON_DATASET_INDEX index. |
| try { |
| ITupleReference groupNameTuple = getTupleToBeDeleted(jobId, |
| MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, groupNameSearchKey); |
| deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, groupNameTuple); |
| } catch (TreeIndexException tie) { |
| // ignore this exception and continue deleting all relevant |
| // artifacts. |
| } |
| |
| // Delete entry from secondary index 'type'. |
| ITupleReference dataTypeSearchKey = createTuple(dataverseName, dataset.getItemTypeName(), datasetName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the DATATYPENAME_ON_DATASET_INDEX index. |
| try { |
| ITupleReference dataTypeTuple = getTupleToBeDeleted(jobId, |
| MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeSearchKey); |
| deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, dataTypeTuple); |
| } catch (TreeIndexException tie) { |
| // ignore this exception and continue deleting all relevant |
| // artifacts. |
| } |
| |
| // Delete entry(s) from the 'indexes' dataset. |
| List<Index> datasetIndexes = getDatasetIndexes(jobId, dataverseName, datasetName); |
| if (datasetIndexes != null) { |
| for (Index index : datasetIndexes) { |
| dropIndex(jobId, dataverseName, datasetName, index.getIndexName()); |
| } |
| } |
| |
| if (dataset.getDatasetType() == DatasetType.EXTERNAL) { |
| // Delete External Files |
| // As a side effect, acquires an S lock on the 'ExternalFile' dataset |
| // on behalf of txnId. |
| List<ExternalFile> datasetFiles = getExternalFiles(jobId, dataset); |
| if (datasetFiles != null && datasetFiles.size() > 0) { |
| // Drop all external files in this dataset. |
| for (ExternalFile file : datasetFiles) { |
| dropExternalFile(jobId, dataverseName, file.getDatasetName(), file.getFileNumber()); |
| } |
| } |
| } |
| |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropIndex(JobId jobId, String dataverseName, String datasetName, String indexName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'index' dataset. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, tuple); |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop index '" + datasetName + "." + indexName |
| + "' because it doesn't exist.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropNodegroup(JobId jobId, String nodeGroupName) throws MetadataException, RemoteException { |
| List<String> datasetNames; |
| try { |
| datasetNames = getDatasetNamesPartitionedOnThisNodeGroup(jobId, nodeGroupName); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| if (!datasetNames.isEmpty()) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Nodegroup '" + nodeGroupName |
| + "' cannot be dropped; it was used for partitioning these datasets:"); |
| for (int i = 0; i < datasetNames.size(); i++) |
| sb.append("\n" + (i + 1) + "- " + datasetNames.get(i) + "."); |
| throw new MetadataException(sb.toString()); |
| } |
| try { |
| ITupleReference searchKey = createTuple(nodeGroupName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'nodegroup' dataset. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple); |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop nodegroup '" + nodeGroupName + "' because it doesn't exist", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException, |
| RemoteException { |
| List<String> datasetNames; |
| List<String> usedDatatypes; |
| try { |
| datasetNames = getDatasetNamesDeclaredByThisDatatype(jobId, dataverseName, datatypeName); |
| usedDatatypes = getDatatypeNamesUsingThisDatatype(jobId, dataverseName, datatypeName); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| // Check whether type is being used by datasets. |
| if (!datasetNames.isEmpty()) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Cannot drop type '" + datatypeName + "'; it was used when creating these datasets:"); |
| for (int i = 0; i < datasetNames.size(); i++) |
| sb.append("\n" + (i + 1) + "- " + datasetNames.get(i) + "."); |
| throw new MetadataException(sb.toString()); |
| } |
| // Check whether type is being used by other types. |
| if (!usedDatatypes.isEmpty()) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Cannot drop type '" + datatypeName + "'; it is used in these datatypes:"); |
| for (int i = 0; i < usedDatatypes.size(); i++) |
| sb.append("\n" + (i + 1) + "- " + usedDatatypes.get(i) + "."); |
| throw new MetadataException(sb.toString()); |
| } |
| // Delete the datatype entry, including all it's nested types. |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datatypeName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'datatype' dataset. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey); |
| // This call uses the secondary index on datatype. Get nested types |
| // before deleting entry from secondary index. |
| List<String> nestedTypes = getNestedDatatypeNames(jobId, dataverseName, datatypeName); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple); |
| deleteFromDatatypeSecondaryIndex(jobId, dataverseName, datatypeName); |
| for (String nestedType : nestedTypes) { |
| Datatype dt = getDatatype(jobId, dataverseName, nestedType); |
| if (dt != null && dt.getIsAnonymous()) { |
| dropDatatype(jobId, dataverseName, dt.getDatatypeName()); |
| } |
| } |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| private void forceDropDatatype(JobId jobId, String dataverseName, String datatypeName) throws AsterixException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datatypeName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'datatype' dataset. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, tuple); |
| deleteFromDatatypeSecondaryIndex(jobId, dataverseName, datatypeName); |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new AsterixException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e); |
| } catch (AsterixException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AsterixException(e); |
| } |
| } |
| |
| private void deleteFromDatatypeSecondaryIndex(JobId jobId, String dataverseName, String datatypeName) |
| throws AsterixException { |
| try { |
| List<String> nestedTypes = getNestedDatatypeNames(jobId, dataverseName, datatypeName); |
| for (String nestedType : nestedTypes) { |
| ITupleReference searchKey = createTuple(dataverseName, nestedType, datatypeName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the DATATYPENAME_ON_DATATYPE_INDEX index. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, |
| MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey); |
| deleteTupleFromIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, tuple); |
| } |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new AsterixException("Cannot drop type '" + datatypeName + "' because it doesn't exist", e); |
| } catch (AsterixException e) { |
| throw e; |
| } catch (Exception e) { |
| throw new AsterixException(e); |
| } |
| } |
| |
| private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple) |
| throws Exception { |
| long resourceID = metadataIndex.getResourceID(); |
| ILSMIndex lsmIndex = (ILSMIndex) indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| // prepare a Callback for logging |
| IModificationOperationCallback modCallback = createIndexModificationCallback(jobId, resourceID, metadataIndex, |
| lsmIndex, IndexOperation.DELETE); |
| ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE); |
| |
| ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false); |
| txnCtx.setWriteTxn(true); |
| txnCtx.registerIndexAndCallback(resourceID, lsmIndex, (AbstractOperationCallback) modCallback, |
| metadataIndex.isPrimaryIndex()); |
| |
| indexAccessor.forceDelete(tuple); |
| indexLifecycleManager.close(resourceID); |
| } |
| |
| @Override |
| public List<Dataverse> getDataverses(JobId jobId) throws MetadataException, RemoteException { |
| try { |
| DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false); |
| IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter); |
| List<Dataverse> results = new ArrayList<Dataverse>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, null, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public Dataverse getDataverse(JobId jobId, String dataverseName) throws MetadataException, RemoteException { |
| |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| DataverseTupleTranslator tupleReaderWriter = new DataverseTupleTranslator(false); |
| IValueExtractor<Dataverse> valueExtractor = new MetadataEntityValueExtractor<Dataverse>(tupleReaderWriter); |
| List<Dataverse> results = new ArrayList<Dataverse>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATAVERSE_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false); |
| IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter); |
| List<Dataset> results = new ArrayList<Dataset>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public List<Feed> getDataverseFeeds(JobId jobId, String dataverseName) throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(false); |
| IValueExtractor<Feed> valueExtractor = new MetadataEntityValueExtractor<Feed>(tupleReaderWriter); |
| List<Feed> results = new ArrayList<Feed>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| public List<Library> getDataverseLibraries(JobId jobId, String dataverseName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(false); |
| IValueExtractor<Library> valueExtractor = new MetadataEntityValueExtractor<Library>(tupleReaderWriter); |
| List<Library> results = new ArrayList<Library>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| private List<Datatype> getDataverseDatatypes(JobId jobId, String dataverseName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false); |
| IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<Datatype>(tupleReaderWriter); |
| List<Datatype> results = new ArrayList<Datatype>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public Dataset getDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datasetName); |
| DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false); |
| List<Dataset> results = new ArrayList<Dataset>(); |
| IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| private List<String> getDatasetNamesDeclaredByThisDatatype(JobId jobId, String dataverseName, String datatypeName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datatypeName); |
| List<String> results = new ArrayList<String>(); |
| IValueExtractor<String> valueExtractor = new DatasetNameValueExtractor(); |
| searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATASET_INDEX, searchKey, valueExtractor, |
| results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| public List<String> getDatatypeNamesUsingThisDatatype(JobId jobId, String dataverseName, String datatypeName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datatypeName); |
| List<String> results = new ArrayList<String>(); |
| IValueExtractor<String> valueExtractor = new DatatypeNameValueExtractor(dataverseName, this); |
| searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey, valueExtractor, |
| results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| private List<String> getNestedDatatypeNames(JobId jobId, String dataverseName, String datatypeName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| List<String> results = new ArrayList<String>(); |
| IValueExtractor<String> valueExtractor = new NestedDatatypeNameValueExtractor(datatypeName); |
| searchIndex(jobId, MetadataSecondaryIndexes.DATATYPENAME_ON_DATATYPE_INDEX, searchKey, valueExtractor, |
| results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| public List<String> getDatasetNamesPartitionedOnThisNodeGroup(JobId jobId, String nodegroup) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(nodegroup); |
| List<String> results = new ArrayList<String>(); |
| IValueExtractor<String> valueExtractor = new DatasetNameValueExtractor(); |
| searchIndex(jobId, MetadataSecondaryIndexes.GROUPNAME_ON_DATASET_INDEX, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public Index getIndex(JobId jobId, String dataverseName, String datasetName, String indexName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datasetName, indexName); |
| IndexTupleTranslator tupleReaderWriter = new IndexTupleTranslator(false); |
| IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<Index>(tupleReaderWriter); |
| List<Index> results = new ArrayList<Index>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public List<Index> getDatasetIndexes(JobId jobId, String dataverseName, String datasetName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datasetName); |
| IndexTupleTranslator tupleReaderWriter = new IndexTupleTranslator(false); |
| IValueExtractor<Index> valueExtractor = new MetadataEntityValueExtractor<Index>(tupleReaderWriter); |
| List<Index> results = new ArrayList<Index>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.INDEX_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, datatypeName); |
| DatatypeTupleTranslator tupleReaderWriter = new DatatypeTupleTranslator(jobId, this, false); |
| IValueExtractor<Datatype> valueExtractor = new MetadataEntityValueExtractor<Datatype>(tupleReaderWriter); |
| List<Datatype> results = new ArrayList<Datatype>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATATYPE_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public NodeGroup getNodeGroup(JobId jobId, String nodeGroupName) throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(nodeGroupName); |
| NodeGroupTupleTranslator tupleReaderWriter = new NodeGroupTupleTranslator(false); |
| IValueExtractor<NodeGroup> valueExtractor = new MetadataEntityValueExtractor<NodeGroup>(tupleReaderWriter); |
| List<NodeGroup> results = new ArrayList<NodeGroup>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.NODEGROUP_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public Function getFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(functionSignature.getNamespace(), functionSignature.getName(), "" |
| + functionSignature.getArity()); |
| FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(false); |
| List<Function> results = new ArrayList<Function>(); |
| IValueExtractor<Function> valueExtractor = new MetadataEntityValueExtractor<Function>(tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException, |
| RemoteException { |
| |
| Function function; |
| try { |
| function = getFunction(jobId, functionSignature); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| if (function == null) { |
| throw new MetadataException("Cannot drop function '" + functionSignature.toString() |
| + "' because it doesn't exist."); |
| } |
| try { |
| // Delete entry from the 'function' dataset. |
| ITupleReference searchKey = createTuple(functionSignature.getNamespace(), functionSignature.getName(), "" |
| + functionSignature.getArity()); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'function' dataset. |
| ITupleReference functionTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, |
| searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, functionTuple); |
| |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("There is no function with the name " + functionSignature.getName() |
| + " and arity " + functionSignature.getArity(), e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| private ITupleReference getTupleToBeDeleted(JobId jobId, IMetadataIndex metadataIndex, ITupleReference searchKey) |
| throws Exception { |
| IValueExtractor<ITupleReference> valueExtractor = new TupleCopyValueExtractor(metadataIndex.getTypeTraits()); |
| List<ITupleReference> results = new ArrayList<ITupleReference>(); |
| searchIndex(jobId, metadataIndex, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| // TODO: Temporarily a TreeIndexException to make it get caught by |
| // caller in the appropriate catch block. |
| throw new TreeIndexException("Could not find entry to be deleted."); |
| } |
| // There should be exactly one result returned from the search. |
| return results.get(0); |
| } |
| |
| // Debugging Method |
| public String printMetadata() { |
| |
| StringBuilder sb = new StringBuilder(); |
| try { |
| IMetadataIndex index = MetadataPrimaryIndexes.DATAVERSE_DATASET; |
| long resourceID = index.getResourceID(); |
| IIndex indexInstance = indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, |
| NoOpOperationCallback.INSTANCE); |
| ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); |
| |
| RangePredicate rangePred = null; |
| rangePred = new RangePredicate(null, null, true, true, null, null); |
| indexAccessor.search(rangeCursor, rangePred); |
| try { |
| while (rangeCursor.hasNext()) { |
| rangeCursor.next(); |
| sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), |
| new ISerializerDeserializer[] { AqlSerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(BuiltinType.ASTRING) })); |
| } |
| } finally { |
| rangeCursor.close(); |
| } |
| indexLifecycleManager.close(resourceID); |
| |
| index = MetadataPrimaryIndexes.DATASET_DATASET; |
| resourceID = index.getResourceID(); |
| indexInstance = indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| indexAccessor = indexInstance |
| .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); |
| rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); |
| |
| rangePred = null; |
| rangePred = new RangePredicate(null, null, true, true, null, null); |
| indexAccessor.search(rangeCursor, rangePred); |
| try { |
| while (rangeCursor.hasNext()) { |
| rangeCursor.next(); |
| sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { |
| AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), |
| AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) })); |
| } |
| } finally { |
| rangeCursor.close(); |
| } |
| indexLifecycleManager.close(resourceID); |
| |
| index = MetadataPrimaryIndexes.INDEX_DATASET; |
| resourceID = index.getResourceID(); |
| indexInstance = indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| indexAccessor = indexInstance |
| .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); |
| rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); |
| |
| rangePred = null; |
| rangePred = new RangePredicate(null, null, true, true, null, null); |
| indexAccessor.search(rangeCursor, rangePred); |
| try { |
| while (rangeCursor.hasNext()) { |
| rangeCursor.next(); |
| sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] { |
| AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), |
| AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING), |
| AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING) })); |
| } |
| } finally { |
| rangeCursor.close(); |
| } |
| indexLifecycleManager.close(resourceID); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| return sb.toString(); |
| } |
| |
| private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey, |
| IValueExtractor<ResultType> valueExtractor, List<ResultType> results) throws Exception { |
| IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory(); |
| long resourceID = index.getResourceID(); |
| IIndex indexInstance = indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, |
| NoOpOperationCallback.INSTANCE); |
| ITreeIndexCursor rangeCursor = (ITreeIndexCursor) indexAccessor.createSearchCursor(false); |
| |
| IBinaryComparator[] searchCmps = null; |
| MultiComparator searchCmp = null; |
| RangePredicate rangePred = null; |
| if (searchKey != null) { |
| searchCmps = new IBinaryComparator[searchKey.getFieldCount()]; |
| for (int i = 0; i < searchKey.getFieldCount(); i++) { |
| searchCmps[i] = comparatorFactories[i].createBinaryComparator(); |
| } |
| searchCmp = new MultiComparator(searchCmps); |
| } |
| rangePred = new RangePredicate(searchKey, searchKey, true, true, searchCmp, searchCmp); |
| indexAccessor.search(rangeCursor, rangePred); |
| |
| try { |
| while (rangeCursor.hasNext()) { |
| rangeCursor.next(); |
| ResultType result = valueExtractor.getValue(jobId, rangeCursor.getTuple()); |
| if (result != null) { |
| results.add(result); |
| } |
| } |
| } finally { |
| rangeCursor.close(); |
| } |
| indexLifecycleManager.close(resourceID); |
| } |
| |
| @Override |
| public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException { |
| int mostRecentDatasetId = MetadataPrimaryIndexes.FIRST_AVAILABLE_USER_DATASET_ID; |
| long resourceID = MetadataPrimaryIndexes.DATASET_DATASET.getResourceID(); |
| try { |
| IIndex indexInstance = indexLifecycleManager.getIndex(resourceID); |
| indexLifecycleManager.open(resourceID); |
| try { |
| IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE, |
| NoOpOperationCallback.INSTANCE); |
| IIndexCursor rangeCursor = indexAccessor.createSearchCursor(false); |
| |
| DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(false); |
| IValueExtractor<Dataset> valueExtractor = new MetadataEntityValueExtractor<Dataset>(tupleReaderWriter); |
| RangePredicate rangePred = new RangePredicate(null, null, true, true, null, null); |
| |
| indexAccessor.search(rangeCursor, rangePred); |
| int datasetId; |
| |
| try { |
| while (rangeCursor.hasNext()) { |
| rangeCursor.next(); |
| final ITupleReference ref = rangeCursor.getTuple(); |
| final Dataset ds = valueExtractor.getValue(jobId, ref); |
| datasetId = ds.getDatasetId(); |
| if (mostRecentDatasetId < datasetId) { |
| mostRecentDatasetId = datasetId; |
| } |
| } |
| } finally { |
| rangeCursor.close(); |
| } |
| } finally { |
| indexLifecycleManager.close(resourceID); |
| } |
| |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| DatasetIdFactory.initialize(mostRecentDatasetId); |
| } |
| |
| // TODO: Can use Hyrack's TupleUtils for this, once we switch to a newer |
| // Hyracks version. |
| public ITupleReference createTuple(String... fields) throws HyracksDataException { |
| ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(BuiltinType.ASTRING); |
| AMutableString aString = new AMutableString(""); |
| ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(fields.length); |
| for (String s : fields) { |
| aString.setValue(s); |
| stringSerde.serialize(aString, tupleBuilder.getDataOutput()); |
| tupleBuilder.addFieldEndOffset(); |
| } |
| ArrayTupleReference tuple = new ArrayTupleReference(); |
| tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); |
| return tuple; |
| } |
| |
| @Override |
| public List<Function> getDataverseFunctions(JobId jobId, String dataverseName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| FunctionTupleTranslator tupleReaderWriter = new FunctionTupleTranslator(false); |
| IValueExtractor<Function> valueExtractor = new MetadataEntityValueExtractor<Function>(tupleReaderWriter); |
| List<Function> results = new ArrayList<Function>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.FUNCTION_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addAdapter(JobId jobId, DatasourceAdapter adapter) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'Adapter' dataset. |
| DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(true); |
| ITupleReference adapterTuple = tupleReaderWriter.getTupleFromMetadataEntity(adapter); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, adapterTuple); |
| |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A adapter with this name " + adapter.getAdapterIdentifier().getAdapterName() |
| + " already exists in dataverse '" + adapter.getAdapterIdentifier().getNamespace() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public void dropAdapter(JobId jobId, String dataverseName, String adapterName) throws MetadataException, |
| RemoteException { |
| DatasourceAdapter adapter; |
| try { |
| adapter = getAdapter(jobId, dataverseName, adapterName); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| if (adapter == null) { |
| throw new MetadataException("Cannot drop adapter '" + adapter + "' because it doesn't exist."); |
| } |
| try { |
| // Delete entry from the 'Adapter' dataset. |
| ITupleReference searchKey = createTuple(dataverseName, adapterName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'Adapter' dataset. |
| ITupleReference datasetTuple = getTupleToBeDeleted(jobId, |
| MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, datasetTuple); |
| |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop adapter '" + adapterName, e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public DatasourceAdapter getAdapter(JobId jobId, String dataverseName, String adapterName) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, adapterName); |
| DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(false); |
| List<DatasourceAdapter> results = new ArrayList<DatasourceAdapter>(); |
| IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<DatasourceAdapter>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException, |
| RemoteException { |
| try { |
| // Insert into the 'CompactionPolicy' dataset. |
| CompactionPolicyTupleTranslator tupleReaderWriter = new CompactionPolicyTupleTranslator(true); |
| ITupleReference compactionPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(compactionPolicy); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, compactionPolicyTuple); |
| |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("A compcation policy with this name " + compactionPolicy.getPolicyName() |
| + " already exists in dataverse '" + compactionPolicy.getPolicyName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policyName) |
| throws MetadataException, RemoteException { |
| |
| try { |
| ITupleReference searchKey = createTuple(dataverse, policyName); |
| CompactionPolicyTupleTranslator tupleReaderWriter = new CompactionPolicyTupleTranslator(false); |
| List<CompactionPolicy> results = new ArrayList<CompactionPolicy>(); |
| IValueExtractor<CompactionPolicy> valueExtractor = new MetadataEntityValueExtractor<CompactionPolicy>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.COMPACTION_POLICY_DATASET, searchKey, valueExtractor, results); |
| if (!results.isEmpty()) { |
| return results.get(0); |
| } |
| return null; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName); |
| DatasourceAdapterTupleTranslator tupleReaderWriter = new DatasourceAdapterTupleTranslator(false); |
| IValueExtractor<DatasourceAdapter> valueExtractor = new MetadataEntityValueExtractor<DatasourceAdapter>( |
| tupleReaderWriter); |
| List<DatasourceAdapter> results = new ArrayList<DatasourceAdapter>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.DATASOURCE_ADAPTER_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addLibrary(JobId jobId, Library library) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'Library' dataset. |
| LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(true); |
| ITupleReference libraryTuple = tupleReaderWriter.getTupleFromMetadataEntity(library); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, libraryTuple); |
| |
| } catch (TreeIndexException e) { |
| throw new MetadataException("A library with this name " + library.getDataverseName() |
| + " already exists in dataverse '" + library.getDataverseName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public void dropLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException, |
| RemoteException { |
| Library library; |
| try { |
| library = getLibrary(jobId, dataverseName, libraryName); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| if (library == null) { |
| throw new MetadataException("Cannot drop library '" + library + "' because it doesn't exist."); |
| } |
| try { |
| // Delete entry from the 'Library' dataset. |
| ITupleReference searchKey = createTuple(dataverseName, libraryName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'Adapter' dataset. |
| ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, datasetTuple); |
| |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop library '" + libraryName, e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public Library getLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException, |
| RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverseName, libraryName); |
| LibraryTupleTranslator tupleReaderWriter = new LibraryTupleTranslator(false); |
| List<Library> results = new ArrayList<Library>(); |
| IValueExtractor<Library> valueExtractor = new MetadataEntityValueExtractor<Library>(tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.LIBRARY_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| public int getMostRecentDatasetId() throws MetadataException, RemoteException { |
| return DatasetIdFactory.getMostRecentDatasetId(); |
| } |
| |
| @Override |
| public void registerFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivity feedActivity) |
| throws MetadataException, RemoteException { |
| try { |
| if (!FeedActivityIdFactory.isInitialized()) { |
| initializeFeedActivityIdFactory(jobId); |
| } |
| feedActivity.setActivityId(FeedActivityIdFactory.generateFeedActivityId()); |
| FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true); |
| ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(feedActivity); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, tuple); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public FeedActivity getRecentFeedActivity(JobId jobId, FeedConnectionId feedId, FeedActivityType... activityType) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(feedId.getDataverse(), feedId.getFeedName(), |
| feedId.getDatasetName()); |
| FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false); |
| List<FeedActivity> results = new ArrayList<FeedActivity>(); |
| IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results); |
| if (!results.isEmpty()) { |
| Collections.sort(results); |
| if (activityType == null) { |
| return results.get(0); |
| } else { |
| for (FeedActivity result : results) { |
| for (FeedActivityType ft : activityType) { |
| if (result.getActivityType().equals(ft)) { |
| return result; |
| } |
| } |
| } |
| } |
| } |
| return null; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void initializeFeedActivityIdFactory(JobId jobId) throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(); |
| FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true); |
| List<FeedActivity> results = new ArrayList<FeedActivity>(); |
| IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results); |
| int maxActivityId = 0; |
| for (FeedActivity fa : results) { |
| if (maxActivityId < fa.getActivityId()) { |
| maxActivityId = fa.getActivityId(); |
| } |
| } |
| FeedActivityIdFactory.initialize(maxActivityId); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public void addFeedPolicy(JobId jobId, FeedPolicy feedPolicy) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'FeedPolicy' dataset. |
| FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(true); |
| ITupleReference feedPolicyTuple = tupleReaderWriter.getTupleFromMetadataEntity(feedPolicy); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, feedPolicyTuple); |
| |
| } catch (TreeIndexException e) { |
| throw new MetadataException("A feed policy with this name " + feedPolicy.getPolicyName() |
| + " already exists in dataverse '" + feedPolicy.getPolicyName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| @Override |
| public FeedPolicy getFeedPolicy(JobId jobId, String dataverse, String policyName) throws MetadataException, |
| RemoteException { |
| |
| try { |
| ITupleReference searchKey = createTuple(dataverse, policyName); |
| FeedPolicyTupleTranslator tupleReaderWriter = new FeedPolicyTupleTranslator(false); |
| List<FeedPolicy> results = new ArrayList<FeedPolicy>(); |
| IValueExtractor<FeedPolicy> valueExtractor = new MetadataEntityValueExtractor<FeedPolicy>(tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_POLICY_DATASET, searchKey, valueExtractor, results); |
| if (!results.isEmpty()) { |
| return results.get(0); |
| } |
| return null; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public List<FeedActivity> getActiveFeeds(JobId jobId, String dataverse, String dataset) throws MetadataException, |
| RemoteException { |
| List<FeedActivity> activeFeeds = new ArrayList<FeedActivity>(); |
| Map<FeedConnectionId, FeedActivity> aFeeds = new HashMap<FeedConnectionId, FeedActivity>(); |
| boolean invalidArgs = (dataverse == null && dataset != null); |
| if (invalidArgs) { |
| throw new MetadataException("Invalid arguments " + dataverse + " " + dataset); |
| } |
| try { |
| ITupleReference searchKey = createTuple(); |
| FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(true); |
| List<FeedActivity> results = new ArrayList<FeedActivity>(); |
| IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results); |
| Collections.sort(results); // recent activity first |
| FeedConnectionId fid = null; |
| Set<FeedConnectionId> terminatedFeeds = new HashSet<FeedConnectionId>(); |
| for (FeedActivity fa : results) { |
| if (dataverse != null) { |
| if (dataset != null |
| && (!fa.getDataverseName().equals(dataverse) || !dataset.equals(fa.getDatasetName()))) { |
| continue; |
| } |
| } |
| |
| fid = new FeedConnectionId(fa.getDataverseName(), fa.getFeedName(), fa.getDatasetName()); |
| switch (fa.getActivityType()) { |
| case FEED_BEGIN: |
| if (!terminatedFeeds.contains(fid)) { |
| if (aFeeds.get(fid) == null || fa.getActivityId() > aFeeds.get(fid).getActivityId()) { |
| aFeeds.put(fid, fa); |
| } |
| } |
| break; |
| case FEED_END: |
| terminatedFeeds.add(fid); |
| break; |
| default: //ignore |
| } |
| } |
| for (FeedActivity f : aFeeds.values()) { |
| System.out.println("ACTIVE FEEDS " + f.getFeedName()); |
| activeFeeds.add(f); |
| } |
| return activeFeeds; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addFeed(JobId jobId, Feed feed) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'Feed' dataset. |
| FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(true); |
| ITupleReference feedTuple = tupleReaderWriter.getTupleFromMetadataEntity(feed); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, feedTuple); |
| |
| } catch (TreeIndexException e) { |
| throw new MetadataException("A feed with this name " + feed.getFeedName() |
| + " already exists in dataverse '" + feed.getDataverseName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public Feed getFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataverse, feedName); |
| FeedTupleTranslator tupleReaderWriter = new FeedTupleTranslator(false); |
| List<Feed> results = new ArrayList<Feed>(); |
| IValueExtractor<Feed> valueExtractor = new MetadataEntityValueExtractor<Feed>(tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey, valueExtractor, results); |
| if (!results.isEmpty()) { |
| return results.get(0); |
| } |
| return null; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException { |
| |
| try { |
| ITupleReference searchKey = createTuple(dataverse, feedName); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'nodegroup' dataset. |
| ITupleReference tuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.FEED_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.FEED_DATASET, tuple); |
| // TODO: Change this to be a BTree specific exception, e.g., |
| // BTreeKeyDoesNotExistException. |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Cannot drop feed '" + feedName + "' because it doesn't exist", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| |
| } |
| |
| public List<FeedActivity> getDatasetsServedByFeed(JobId jobId, String dataverse, String feedName) |
| throws MetadataException, RemoteException { |
| List<FeedActivity> feedActivities = new ArrayList<FeedActivity>(); |
| try { |
| ITupleReference searchKey = createTuple(dataverse, feedName); |
| FeedActivityTupleTranslator tupleReaderWriter = new FeedActivityTupleTranslator(false); |
| List<FeedActivity> results = new ArrayList<FeedActivity>(); |
| IValueExtractor<FeedActivity> valueExtractor = new MetadataEntityValueExtractor<FeedActivity>( |
| tupleReaderWriter); |
| searchIndex(jobId, MetadataPrimaryIndexes.FEED_ACTIVITY_DATASET, searchKey, valueExtractor, results); |
| |
| if (!results.isEmpty()) { |
| Collections.sort(results); // most recent feed activity |
| Set<String> terminatedDatasets = new HashSet<String>(); |
| Set<String> activeDatasets = new HashSet<String>(); |
| |
| for (FeedActivity result : results) { |
| switch (result.getFeedActivityType()) { |
| case FEED_BEGIN: |
| if (!terminatedDatasets.contains(result.getDatasetName())) { |
| feedActivities.add(result); |
| activeDatasets.add(result.getDatasetName()); |
| } |
| break; |
| case FEED_END: |
| if (!activeDatasets.contains(result.getDatasetName())) { |
| terminatedDatasets.add(result.getDatasetName()); |
| } |
| break; |
| } |
| |
| } |
| } |
| return feedActivities; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void addExternalFile(JobId jobId, ExternalFile externalFile) throws MetadataException, RemoteException { |
| try { |
| // Insert into the 'externalFiles' dataset. |
| ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(true); |
| ITupleReference externalFileTuple = tupleReaderWriter.getTupleFromMetadataEntity(externalFile); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, externalFileTuple); |
| } catch (TreeIndexDuplicateKeyException e) { |
| throw new MetadataException("An externalFile with this number " + externalFile.getFileNumber() |
| + " already exists in dataset '" + externalFile.getDatasetName() + "' in dataverse '" |
| + externalFile.getDataverseName() + "'.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public List<ExternalFile> getExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName()); |
| ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false); |
| IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>( |
| tupleReaderWriter); |
| List<ExternalFile> results = new ArrayList<ExternalFile>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results); |
| return results; |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropExternalFile(JobId jobId, String dataverseName, String datasetName, int fileNumber) |
| throws MetadataException, RemoteException { |
| try { |
| // Delete entry from the 'ExternalFile' dataset. |
| ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'ExternalFile' dataset. |
| ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, |
| searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, datasetTuple); |
| } catch (TreeIndexException e) { |
| throw new MetadataException("Couldn't drop externalFile.", e); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void dropExternalFiles(JobId jobId, Dataset dataset) throws MetadataException, RemoteException { |
| List<ExternalFile> files; |
| try { |
| files = getExternalFiles(jobId, dataset); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| try { |
| //loop through files and delete them |
| for (int i = 0; i < files.size(); i++) { |
| dropExternalFile(jobId, files.get(i).getDataverseName(), files.get(i).getDatasetName(), files.get(i) |
| .getFileNumber()); |
| } |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| // This method is used to create a search tuple for external data file since the search tuple has an int value |
| @SuppressWarnings("unchecked") |
| public ITupleReference createExternalFileSearchTuple(String dataverseName, String datasetName, int fileNumber) |
| throws HyracksDataException { |
| ISerializerDeserializer<AString> stringSerde = AqlSerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(BuiltinType.ASTRING); |
| ISerializerDeserializer<AInt32> intSerde = AqlSerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(BuiltinType.AINT32); |
| |
| AMutableString aString = new AMutableString(""); |
| ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(3); |
| |
| //dataverse field |
| aString.setValue(dataverseName); |
| stringSerde.serialize(aString, tupleBuilder.getDataOutput()); |
| tupleBuilder.addFieldEndOffset(); |
| |
| //dataset field |
| aString.setValue(datasetName); |
| stringSerde.serialize(aString, tupleBuilder.getDataOutput()); |
| tupleBuilder.addFieldEndOffset(); |
| |
| //file number field |
| intSerde.serialize(new AInt32(fileNumber), tupleBuilder.getDataOutput()); |
| tupleBuilder.addFieldEndOffset(); |
| |
| ArrayTupleReference tuple = new ArrayTupleReference(); |
| tuple.reset(tupleBuilder.getFieldEndOffsets(), tupleBuilder.getByteArray()); |
| return tuple; |
| } |
| |
| @Override |
| public ExternalFile getExternalFile(JobId jobId, String dataverseName, String datasetName, Integer fileNumber) |
| throws MetadataException, RemoteException { |
| try { |
| ITupleReference searchKey = createExternalFileSearchTuple(dataverseName, datasetName, fileNumber); |
| ExternalFileTupleTranslator tupleReaderWriter = new ExternalFileTupleTranslator(false); |
| IValueExtractor<ExternalFile> valueExtractor = new MetadataEntityValueExtractor<ExternalFile>( |
| tupleReaderWriter); |
| List<ExternalFile> results = new ArrayList<ExternalFile>(); |
| searchIndex(jobId, MetadataPrimaryIndexes.EXTERNAL_FILE_DATASET, searchKey, valueExtractor, results); |
| if (results.isEmpty()) { |
| return null; |
| } |
| return results.get(0); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| |
| @Override |
| public void updateDataset(JobId jobId, Dataset dataset) throws MetadataException, RemoteException { |
| try { |
| // This method will delete previous entry of the dataset and insert the new one |
| // Delete entry from the 'datasets' dataset. |
| ITupleReference searchKey; |
| searchKey = createTuple(dataset.getDataverseName(), dataset.getDatasetName()); |
| // Searches the index for the tuple to be deleted. Acquires an S |
| // lock on the 'dataset' dataset. |
| ITupleReference datasetTuple = getTupleToBeDeleted(jobId, MetadataPrimaryIndexes.DATASET_DATASET, searchKey); |
| deleteTupleFromIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple); |
| // Previous tuple was deleted |
| // Insert into the 'dataset' dataset. |
| DatasetTupleTranslator tupleReaderWriter = new DatasetTupleTranslator(true); |
| datasetTuple = tupleReaderWriter.getTupleFromMetadataEntity(dataset); |
| insertTupleIntoIndex(jobId, MetadataPrimaryIndexes.DATASET_DATASET, datasetTuple); |
| } catch (Exception e) { |
| throw new MetadataException(e); |
| } |
| } |
| } |