blob: 07cb8ab5823b0a5e00457945f3f2500d345fcf3f [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata;
import java.rmi.RemoteException;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.asterix.common.config.AsterixMetadataProperties;
import edu.uci.ics.asterix.common.exceptions.ACIDException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.common.feeds.FeedConnectionId;
import edu.uci.ics.asterix.common.functions.FunctionSignature;
import edu.uci.ics.asterix.common.transactions.JobId;
import edu.uci.ics.asterix.metadata.api.IAsterixStateProxy;
import edu.uci.ics.asterix.metadata.api.IMetadataManager;
import edu.uci.ics.asterix.metadata.api.IMetadataNode;
import edu.uci.ics.asterix.metadata.entities.CompactionPolicy;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.DatasourceAdapter;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalFile;
import edu.uci.ics.asterix.metadata.entities.Feed;
import edu.uci.ics.asterix.metadata.entities.FeedActivity;
import edu.uci.ics.asterix.metadata.entities.FeedActivity.FeedActivityType;
import edu.uci.ics.asterix.metadata.entities.FeedPolicy;
import edu.uci.ics.asterix.metadata.entities.Function;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.Library;
import edu.uci.ics.asterix.metadata.entities.Node;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.transaction.management.service.transaction.JobIdFactory;
import edu.uci.ics.hyracks.api.client.IHyracksClientConnection;
/**
* Provides access to Asterix metadata via remote methods to the metadata node.
* This metadata manager maintains a local cache of metadata Java objects
* received from the metadata node, to avoid contacting the metadata node
* repeatedly. We assume that this metadata manager is the only metadata manager
* in an Asterix cluster. Therefore, no separate cache-invalidation mechanism is
* needed at this point. Assumptions/Limitations: The metadata subsystem is
* started during NC Bootstrap start, i.e., when Asterix is deployed. The
* metadata subsystem is destroyed in NC Bootstrap end, i.e., when Asterix is
* undeployed. The metadata subsystem consists of the MetadataManager and the
* MatadataNode. The MetadataManager provides users access to the metadata. The
* MetadataNode implements direct access to the storage layer on behalf of the
* MetadataManager, and translates the binary representation of ADM into Java
* objects for consumption by the MetadataManager's users. There is exactly one
* instance of the MetadataManager and of the MetadataNode in the cluster, which
* may or may not be co-located on the same machine (or in the same JVM). The
* MetadataManager exists in the same JVM as its user's (e.g., the query
* compiler). The MetadataNode exists in the same JVM as it's transactional
* components (LockManager, LogManager, etc.) Users shall access the metadata
* only through the MetadataManager, and never via the MetadataNode directly.
* Multiple threads may issue requests to the MetadataManager concurrently. For
* the sake of accessing metadata, we assume a transaction consists of one
* thread. Users are responsible for locking the metadata (using the
* MetadataManager API) before issuing requests. The MetadataNode is responsible
* for acquiring finer-grained locks on behalf of requests from the
* MetadataManager. Currently, locks are acquired per BTree, since the BTree
* does not acquire even finer-grained locks yet internally. The metadata can be
* queried with AQL DML like any other dataset, but can only be changed with AQL
* DDL. The transaction ids for metadata transactions must be unique across the
* cluster, i.e., metadata transaction ids shall never "accidentally" overlap
* with transaction ids of regular jobs or other metadata transactions.
*/
public class MetadataManager implements IMetadataManager {
private static final int INITIAL_SLEEP_TIME = 64;
private static final int RETRY_MULTIPLIER = 4;
private static final int MAX_RETRY_COUNT = 6;
// Set in init().
public static MetadataManager INSTANCE;
private final MetadataCache cache = new MetadataCache();
private IAsterixStateProxy proxy;
private IMetadataNode metadataNode;
private final ReadWriteLock metadataLatch;
private final AsterixMetadataProperties metadataProperties;
private IHyracksClientConnection hcc;
public MetadataManager(IAsterixStateProxy proxy, AsterixMetadataProperties metadataProperties) {
if (proxy == null) {
throw new Error("Null proxy given to MetadataManager.");
}
this.proxy = proxy;
this.metadataProperties = metadataProperties;
this.metadataNode = null;
this.metadataLatch = new ReentrantReadWriteLock(true);
}
public MetadataManager(IAsterixStateProxy proxy, IMetadataNode metadataNode) {
if (metadataNode == null) {
throw new Error("Null metadataNode given to MetadataManager.");
}
this.proxy = proxy;
this.metadataProperties = null;
this.metadataNode = metadataNode;
this.metadataLatch = new ReentrantReadWriteLock(true);
}
@Override
public void init() throws RemoteException, MetadataException {
// Could be synchronized on any object. Arbitrarily chose proxy.
synchronized (proxy) {
if (metadataNode != null) {
return;
}
try {
int retry = 0;
int sleep = INITIAL_SLEEP_TIME;
while (retry++ < MAX_RETRY_COUNT) {
metadataNode = proxy.getMetadataNode();
if (metadataNode != null) {
break;
}
Thread.sleep(sleep);
sleep *= RETRY_MULTIPLIER;
}
} catch (InterruptedException e) {
throw new MetadataException(e);
}
if (metadataNode == null) {
throw new Error("Failed to get the MetadataNode.\n" + "The MetadataNode was configured to run on NC: "
+ metadataProperties.getMetadataNodeName());
}
}
}
@Override
public MetadataTransactionContext beginTransaction() throws RemoteException, ACIDException {
JobId jobId = JobIdFactory.generateJobId();
metadataNode.beginTransaction(jobId);
return new MetadataTransactionContext(jobId);
}
@Override
public void commitTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
metadataNode.commitTransaction(ctx.getJobId());
cache.commit(ctx);
}
@Override
public void abortTransaction(MetadataTransactionContext ctx) throws RemoteException, ACIDException {
metadataNode.abortTransaction(ctx.getJobId());
}
@Override
public void lock(MetadataTransactionContext ctx, byte lockMode) throws RemoteException, ACIDException {
metadataNode.lock(ctx.getJobId(), lockMode);
}
@Override
public void unlock(MetadataTransactionContext ctx, byte lockMode) throws RemoteException, ACIDException {
metadataNode.unlock(ctx.getJobId(), lockMode);
}
@Override
public void addDataverse(MetadataTransactionContext ctx, Dataverse dataverse) throws MetadataException {
try {
metadataNode.addDataverse(ctx.getJobId(), dataverse);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addDataverse(dataverse);
}
@Override
public void dropDataverse(MetadataTransactionContext ctx, String dataverseName) throws MetadataException {
try {
metadataNode.dropDataverse(ctx.getJobId(), dataverseName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropDataverse(dataverseName);
}
@Override
public List<Dataverse> getDataverses(MetadataTransactionContext ctx) throws MetadataException {
try {
return metadataNode.getDataverses(ctx.getJobId());
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public Dataverse getDataverse(MetadataTransactionContext ctx, String dataverseName) throws MetadataException {
// First look in the context to see if this transaction created the
// requested dataverse itself (but the dataverse is still uncommitted).
Dataverse dataverse = ctx.getDataverse(dataverseName);
if (dataverse != null) {
// Don't add this dataverse to the cache, since it is still
// uncommitted.
return dataverse;
}
if (ctx.dataverseIsDropped(dataverseName)) {
// Dataverse has been dropped by this transaction but could still be
// in the cache.
return null;
}
dataverse = cache.getDataverse(dataverseName);
if (dataverse != null) {
// Dataverse is already in the cache, don't add it again.
return dataverse;
}
try {
dataverse = metadataNode.getDataverse(ctx.getJobId(), dataverseName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the dataverse from the MetadataNode. Add it to the cache
// when this transaction commits.
if (dataverse != null) {
ctx.addDataverse(dataverse);
}
return dataverse;
}
@Override
public List<Dataset> getDataverseDatasets(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException {
List<Dataset> dataverseDatasets;
try {
// Assuming that the transaction can read its own writes on the
// metadata node.
dataverseDatasets = metadataNode.getDataverseDatasets(ctx.getJobId(), dataverseName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// Don't update the cache to avoid checking against the transaction's
// uncommitted datasets.
return dataverseDatasets;
}
@Override
public void addDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
// add dataset into metadataNode
try {
metadataNode.addDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// reflect the dataset into the cache
ctx.addDataset(dataset);
}
@Override
public void dropDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName)
throws MetadataException {
try {
metadataNode.dropDataset(ctx.getJobId(), dataverseName, datasetName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropDataset(dataverseName, datasetName);
}
@Override
public Dataset getDataset(MetadataTransactionContext ctx, String dataverseName, String datasetName)
throws MetadataException {
// First look in the context to see if this transaction created the
// requested dataset itself (but the dataset is still uncommitted).
Dataset dataset = ctx.getDataset(dataverseName, datasetName);
if (dataset != null) {
// Don't add this dataverse to the cache, since it is still
// uncommitted.
return dataset;
}
if (ctx.datasetIsDropped(dataverseName, datasetName)) {
// Dataset has been dropped by this transaction but could still be
// in the cache.
return null;
}
dataset = cache.getDataset(dataverseName, datasetName);
if (dataset != null) {
// Dataset is already in the cache, don't add it again.
return dataset;
}
try {
dataset = metadataNode.getDataset(ctx.getJobId(), dataverseName, datasetName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the dataset from the MetadataNode. Add it to the cache
// when this transaction commits.
if (dataset != null) {
ctx.addDataset(dataset);
}
return dataset;
}
@Override
public List<Index> getDatasetIndexes(MetadataTransactionContext ctx, String dataverseName, String datasetName)
throws MetadataException {
List<Index> datsetIndexes;
try {
datsetIndexes = metadataNode.getDatasetIndexes(ctx.getJobId(), dataverseName, datasetName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return datsetIndexes;
}
@Override
public void addCompactionPolicy(MetadataTransactionContext mdTxnCtx, CompactionPolicy compactionPolicy)
throws MetadataException {
try {
metadataNode.addCompactionPolicy(mdTxnCtx.getJobId(), compactionPolicy);
} catch (RemoteException e) {
throw new MetadataException(e);
}
mdTxnCtx.addCompactionPolicy(compactionPolicy);
}
@Override
public CompactionPolicy getCompactionPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException {
CompactionPolicy compactionPolicy = null;
try {
compactionPolicy = metadataNode.getCompactionPolicy(ctx.getJobId(), dataverse, policyName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return compactionPolicy;
}
@Override
public void addDatatype(MetadataTransactionContext ctx, Datatype datatype) throws MetadataException {
try {
metadataNode.addDatatype(ctx.getJobId(), datatype);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addDatatype(datatype);
}
@Override
public void dropDatatype(MetadataTransactionContext ctx, String dataverseName, String datatypeName)
throws MetadataException {
try {
metadataNode.dropDatatype(ctx.getJobId(), dataverseName, datatypeName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropDataDatatype(dataverseName, datatypeName);
}
@Override
public Datatype getDatatype(MetadataTransactionContext ctx, String dataverseName, String datatypeName)
throws MetadataException {
// First look in the context to see if this transaction created the
// requested datatype itself (but the datatype is still uncommitted).
Datatype datatype = ctx.getDatatype(dataverseName, datatypeName);
if (datatype != null) {
// Don't add this dataverse to the cache, since it is still
// uncommitted.
return datatype;
}
if (ctx.datatypeIsDropped(dataverseName, datatypeName)) {
// Datatype has been dropped by this transaction but could still be
// in the cache.
return null;
}
datatype = cache.getDatatype(dataverseName, datatypeName);
if (datatype != null) {
// Datatype is already in the cache, don't add it again.
try {
//create a new Datatype object with a new ARecordType object in order to avoid
//concurrent access to UTF8StringPointable comparator in ARecordType object.
//see issue 510
ARecordType aRecType = (ARecordType) datatype.getDatatype();
return new Datatype(datatype.getDataverseName(), datatype.getDatatypeName(), new ARecordType(
aRecType.getTypeName(), aRecType.getFieldNames(), aRecType.getFieldTypes(), aRecType.isOpen()),
datatype.getIsAnonymous());
} catch (AsterixException e) {
throw new MetadataException(e);
}
}
try {
datatype = metadataNode.getDatatype(ctx.getJobId(), dataverseName, datatypeName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the datatype from the MetadataNode. Add it to the cache
// when this transaction commits.
if (datatype != null) {
ctx.addDatatype(datatype);
}
return datatype;
}
@Override
public void addIndex(MetadataTransactionContext ctx, Index index) throws MetadataException {
try {
metadataNode.addIndex(ctx.getJobId(), index);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addIndex(index);
}
@Override
public void addAdapter(MetadataTransactionContext mdTxnCtx, DatasourceAdapter adapter) throws MetadataException {
try {
metadataNode.addAdapter(mdTxnCtx.getJobId(), adapter);
} catch (RemoteException e) {
throw new MetadataException(e);
}
mdTxnCtx.addAdapter(adapter);
}
@Override
public void dropIndex(MetadataTransactionContext ctx, String dataverseName, String datasetName, String indexName)
throws MetadataException {
try {
metadataNode.dropIndex(ctx.getJobId(), dataverseName, datasetName, indexName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropIndex(dataverseName, datasetName, indexName);
}
@Override
public Index getIndex(MetadataTransactionContext ctx, String dataverseName, String datasetName, String indexName)
throws MetadataException {
// First look in the context to see if this transaction created the
// requested index itself (but the index is still uncommitted).
Index index = ctx.getIndex(dataverseName, datasetName, indexName);
if (index != null) {
// Don't add this index to the cache, since it is still
// uncommitted.
return index;
}
if (ctx.indexIsDropped(dataverseName, datasetName, indexName)) {
// Index has been dropped by this transaction but could still be
// in the cache.
return null;
}
index = cache.getIndex(dataverseName, datasetName, indexName);
if (index != null) {
// Index is already in the cache, don't add it again.
return index;
}
try {
index = metadataNode.getIndex(ctx.getJobId(), dataverseName, datasetName, indexName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the index from the MetadataNode. Add it to the cache
// when this transaction commits.
if (index != null) {
ctx.addIndex(index);
}
return index;
}
@Override
public void addNode(MetadataTransactionContext ctx, Node node) throws MetadataException {
try {
metadataNode.addNode(ctx.getJobId(), node);
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws MetadataException {
try {
metadataNode.addNodeGroup(ctx.getJobId(), nodeGroup);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addNogeGroup(nodeGroup);
}
@Override
public void dropNodegroup(MetadataTransactionContext ctx, String nodeGroupName) throws MetadataException {
try {
metadataNode.dropNodegroup(ctx.getJobId(), nodeGroupName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropNodeGroup(nodeGroupName);
}
@Override
public NodeGroup getNodegroup(MetadataTransactionContext ctx, String nodeGroupName) throws MetadataException {
// First look in the context to see if this transaction created the
// requested dataverse itself (but the dataverse is still uncommitted).
NodeGroup nodeGroup = ctx.getNodeGroup(nodeGroupName);
if (nodeGroup != null) {
// Don't add this dataverse to the cache, since it is still
// uncommitted.
return nodeGroup;
}
if (ctx.nodeGroupIsDropped(nodeGroupName)) {
// NodeGroup has been dropped by this transaction but could still be
// in the cache.
return null;
}
nodeGroup = cache.getNodeGroup(nodeGroupName);
if (nodeGroup != null) {
// NodeGroup is already in the cache, don't add it again.
return nodeGroup;
}
try {
nodeGroup = metadataNode.getNodeGroup(ctx.getJobId(), nodeGroupName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the nodeGroup from the MetadataNode. Add it to the cache
// when this transaction commits.
if (nodeGroup != null) {
ctx.addNogeGroup(nodeGroup);
}
return nodeGroup;
}
@Override
public void addFunction(MetadataTransactionContext mdTxnCtx, Function function) throws MetadataException {
try {
metadataNode.addFunction(mdTxnCtx.getJobId(), function);
} catch (RemoteException e) {
throw new MetadataException(e);
}
mdTxnCtx.addFunction(function);
}
@Override
public void dropFunction(MetadataTransactionContext ctx, FunctionSignature functionSignature)
throws MetadataException {
try {
metadataNode.dropFunction(ctx.getJobId(), functionSignature);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropFunction(functionSignature);
}
@Override
public Function getFunction(MetadataTransactionContext ctx, FunctionSignature functionSignature)
throws MetadataException {
// First look in the context to see if this transaction created the
// requested dataset itself (but the dataset is still uncommitted).
Function function = ctx.getFunction(functionSignature);
if (function != null) {
// Don't add this dataverse to the cache, since it is still
// uncommitted.
return function;
}
if (ctx.functionIsDropped(functionSignature)) {
// Function has been dropped by this transaction but could still be
// in the cache.
return null;
}
if (ctx.getDataverse(functionSignature.getNamespace()) != null) {
// This transaction has dropped and subsequently created the same
// dataverse.
return null;
}
function = cache.getFunction(functionSignature);
if (function != null) {
// Function is already in the cache, don't add it again.
return function;
}
try {
function = metadataNode.getFunction(ctx.getJobId(), functionSignature);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// We fetched the function from the MetadataNode. Add it to the cache
// when this transaction commits.
if (function != null) {
ctx.addFunction(function);
}
return function;
}
@Override
public void addFeedPolicy(MetadataTransactionContext mdTxnCtx, FeedPolicy feedPolicy) throws MetadataException {
try {
metadataNode.addFeedPolicy(mdTxnCtx.getJobId(), feedPolicy);
} catch (RemoteException e) {
throw new MetadataException(e);
}
mdTxnCtx.addFeedPolicy(feedPolicy);
}
@Override
public void initializeDatasetIdFactory(MetadataTransactionContext ctx) throws MetadataException {
try {
metadataNode.initializeDatasetIdFactory(ctx.getJobId());
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public int getMostRecentDatasetId() throws MetadataException {
try {
return metadataNode.getMostRecentDatasetId();
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public List<Function> getDataverseFunctions(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException {
List<Function> dataverseFunctions;
try {
// Assuming that the transaction can read its own writes on the
// metadata node.
dataverseFunctions = metadataNode.getDataverseFunctions(ctx.getJobId(), dataverseName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// Don't update the cache to avoid checking against the transaction's
// uncommitted functions.
return dataverseFunctions;
}
@Override
public void dropAdapter(MetadataTransactionContext ctx, String dataverseName, String name) throws MetadataException {
try {
metadataNode.dropAdapter(ctx.getJobId(), dataverseName, name);
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public DatasourceAdapter getAdapter(MetadataTransactionContext ctx, String dataverseName, String name)
throws MetadataException {
DatasourceAdapter adapter = null;
try {
adapter = metadataNode.getAdapter(ctx.getJobId(), dataverseName, name);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return adapter;
}
@Override
public void registerFeedActivity(MetadataTransactionContext ctx, FeedConnectionId feedId, FeedActivity feedActivity)
throws MetadataException {
try {
metadataNode.registerFeedActivity(ctx.getJobId(), feedId, feedActivity);
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public FeedActivity getRecentActivityOnFeedConnection(MetadataTransactionContext ctx, FeedConnectionId feedId,
FeedActivityType... feedActivityTypes) throws MetadataException {
FeedActivity feedActivity = null;
try {
feedActivity = metadataNode.getRecentFeedActivity(ctx.getJobId(), feedId, feedActivityTypes);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return feedActivity;
}
public void dropLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName)
throws MetadataException {
try {
metadataNode.dropLibrary(ctx.getJobId(), dataverseName, libraryName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropLibrary(dataverseName, libraryName);
}
@Override
public List<Library> getDataverseLibraries(MetadataTransactionContext ctx, String dataverseName)
throws MetadataException {
List<Library> dataverseLibaries = null;
try {
// Assuming that the transaction can read its own writes on the
// metadata node.
dataverseLibaries = metadataNode.getDataverseLibraries(ctx.getJobId(), dataverseName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// Don't update the cache to avoid checking against the transaction's
// uncommitted functions.
return dataverseLibaries;
}
@Override
public void addLibrary(MetadataTransactionContext ctx, Library library) throws MetadataException {
try {
metadataNode.addLibrary(ctx.getJobId(), library);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addLibrary(library);
}
@Override
public Library getLibrary(MetadataTransactionContext ctx, String dataverseName, String libraryName)
throws MetadataException, RemoteException {
Library library = null;
try {
library = metadataNode.getLibrary(ctx.getJobId(), dataverseName, libraryName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return library;
}
@Override
public void acquireWriteLatch() {
metadataLatch.writeLock().lock();
}
@Override
public void releaseWriteLatch() {
metadataLatch.writeLock().unlock();
}
@Override
public void acquireReadLatch() {
metadataLatch.readLock().lock();
}
@Override
public void releaseReadLatch() {
metadataLatch.readLock().unlock();
}
@Override
public FeedPolicy getFeedPolicy(MetadataTransactionContext ctx, String dataverse, String policyName)
throws MetadataException {
FeedPolicy FeedPolicy = null;
try {
FeedPolicy = metadataNode.getFeedPolicy(ctx.getJobId(), dataverse, policyName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return FeedPolicy;
}
@Override
public List<FeedActivity> getActiveFeeds(MetadataTransactionContext ctx, String dataverse, String dataset)
throws MetadataException {
List<FeedActivity> feedActivities = null;
try {
feedActivities = metadataNode.getActiveFeeds(ctx.getJobId(), dataverse, dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return feedActivities;
}
@Override
public Feed getFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
Feed feed = null;
try {
feed = metadataNode.getFeed(ctx.getJobId(), dataverse, feedName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return feed;
}
@Override
public void dropFeed(MetadataTransactionContext ctx, String dataverse, String feedName) throws MetadataException {
try {
metadataNode.dropFeed(ctx.getJobId(), dataverse, feedName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.dropFeed(dataverse, feedName);
}
@Override
public void addFeed(MetadataTransactionContext ctx, Feed feed) throws MetadataException {
try {
metadataNode.addFeed(ctx.getJobId(), feed);
} catch (RemoteException e) {
throw new MetadataException(e);
}
ctx.addFeed(feed);
}
public List<FeedActivity> getConnectFeedActivitiesForFeed(MetadataTransactionContext ctx, String dataverse,
String feedName) throws MetadataException {
List<FeedActivity> feedActivities = null;
try {
feedActivities = metadataNode.getDatasetsServedByFeed(ctx.getJobId(), dataverse, feedName);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return feedActivities;
}
public List<DatasourceAdapter> getDataverseAdapters(MetadataTransactionContext mdTxnCtx, String dataverse)
throws MetadataException {
List<DatasourceAdapter> dataverseAdapters;
try {
dataverseAdapters = metadataNode.getDataverseAdapters(mdTxnCtx.getJobId(), dataverse);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return dataverseAdapters;
}
@Override
public List<ExternalFile> getDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset)
throws MetadataException {
List<ExternalFile> externalFiles;
try {
externalFiles = metadataNode.getExternalFiles(mdTxnCtx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return externalFiles;
}
@Override
public void addExternalFile(MetadataTransactionContext ctx, ExternalFile externalFile) throws MetadataException {
try {
metadataNode.addExternalFile(ctx.getJobId(), externalFile);
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public void dropExternalFile(MetadataTransactionContext ctx, ExternalFile externalFile) throws MetadataException {
try {
metadataNode.dropExternalFile(ctx.getJobId(), externalFile.getDataverseName(), externalFile.getDatasetName(),
externalFile.getFileNumber());
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public ExternalFile getExternalFile(MetadataTransactionContext ctx, String dataverseName, String datasetName,
Integer fileNumber) throws MetadataException {
ExternalFile file;
try {
file = metadataNode.getExternalFile(ctx.getJobId(), dataverseName, datasetName, fileNumber);
} catch (RemoteException e) {
throw new MetadataException(e);
}
return file;
}
//TODO: Optimize <-- use keys instead of object -->
@Override
public void dropDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset) throws MetadataException {
try {
metadataNode.dropExternalFiles(mdTxnCtx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
}
@Override
public void updateDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException {
try {
metadataNode.updateDataset(ctx.getJobId(), dataset);
} catch (RemoteException e) {
throw new MetadataException(e);
}
// reflect the dataset into the cache
ctx.dropDataset(dataset);
ctx.addDataset(dataset);
}
}