blob: ba7c797a521d0d8e96117d34291738fb7eb48eba [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.annotations.TypeDataGen;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IMetadataManager;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
public class AqlCompiledMetadataDeclarations {
private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
// We are assuming that there is a one AqlCompiledMetadataDeclarations per
// transaction.
private final MetadataTransactionContext mdTxnCtx;
private String dataverseName = null;
private FileSplit outputFile;
private Map<String, String[]> stores;
private IDataFormat format;
private Map<String, String> config;
private final Map<String, IAType> types;
private final Map<String, TypeDataGen> typeDataGenMap;
private final IAWriterFactory writerFactory;
private IMetadataManager metadataManager = MetadataManager.INSTANCE;
private boolean isConnected = false;
public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
this.mdTxnCtx = mdTxnCtx;
this.dataverseName = dataverseName;
this.outputFile = outputFile;
this.config = config;
if (stores == null && online) {
this.stores = AsterixProperties.INSTANCE.getStores();
} else {
this.stores = stores;
}
this.types = types;
this.typeDataGenMap = typeDataGenMap;
this.writerFactory = writerFactory;
}
public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
if (isConnected) {
throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
}
Dataverse dv;
try {
dv = metadataManager.getDataverse(mdTxnCtx, dvName);
} catch (Exception e) {
throw new AsterixException(e);
}
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
}
dataverseName = dvName;
isConnected = true;
try {
format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
} catch (Exception e) {
throw new AsterixException(e);
}
}
public void disconnectFromDataverse() throws AlgebricksException {
if (!isConnected) {
throw new AlgebricksException("You are not connected to any dataverse");
}
dataverseName = null;
format = null;
isConnected = false;
}
public boolean isConnectedToDataverse() {
return isConnected;
}
public String getDataverseName() {
return dataverseName;
}
public FileSplit getOutputFile() {
return outputFile;
}
public IDataFormat getFormat() throws AlgebricksException {
if (!isConnected) {
throw new AlgebricksException("You need first to connect to a dataverse.");
}
return format;
}
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
}
public IAType findType(String typeName) {
Datatype type;
try {
type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
} catch (Exception e) {
throw new IllegalStateException();
}
if (type == null) {
throw new IllegalStateException();
}
return type.getDatatype();
}
public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException {
NodeGroup ng;
try {
ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
if (ng == null) {
throw new AlgebricksException("No node group with this name " + nodeGroupName);
}
return ng.getNodeNames();
}
public String[] getStores(String nodeName) {
return stores.get(nodeName);
}
public Map<String, String[]> getAllStores() {
return stores;
}
public Dataset findDataset(String datasetName) throws AlgebricksException {
try {
return metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public List<Index> getDatasetIndexes(String dataverseName, String datasetName) throws AlgebricksException {
try {
return metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public Index getDatasetPrimaryIndex(String dataverseName, String datasetName) throws AlgebricksException {
try {
return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, datasetName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public Index getIndex(String dataverseName, String datasetName, String indexName) throws AlgebricksException {
try {
return metadataManager.getIndex(mdTxnCtx, dataverseName, datasetName, indexName);
} catch (MetadataException e) {
throw new AlgebricksException(e);
}
}
public void setOutputFile(FileSplit outputFile) {
this.outputFile = outputFile;
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
String datasetName, String targetIdxName) throws AlgebricksException {
FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
String[] loc = new String[splits.length];
for (int p = 0; p < splits.length; p++) {
loc[p] = splits[p].getNodeName();
}
AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
}
public Pair<IFileSplitProvider, IFileSplitProvider> getInvertedIndexFileSplitProviders(
IFileSplitProvider splitProvider) {
int numSplits = splitProvider.getFileSplits().length;
FileSplit[] btreeSplits = new FileSplit[numSplits];
FileSplit[] invListsSplits = new FileSplit[numSplits];
for (int i = 0; i < numSplits; i++) {
String nodeName = splitProvider.getFileSplits()[i].getNodeName();
String path = splitProvider.getFileSplits()[i].getLocalFile().getFile().getPath();
btreeSplits[i] = new FileSplit(nodeName, path + "_$btree");
invListsSplits[i] = new FileSplit(nodeName, path + "_$invlists");
}
return new Pair<IFileSplitProvider, IFileSplitProvider>(new ConstantFileSplitProvider(btreeSplits),
new ConstantFileSplitProvider(invListsSplits));
}
private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
throws AlgebricksException {
File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
Dataset dataset = findDataset(datasetName);
if (dataset.getDatasetType() != DatasetType.INTERNAL & dataset.getDatasetType() != DatasetType.FEED) {
throw new AlgebricksException("Not an internal or feed dataset");
}
InternalDatasetDetails datasetDetails = (InternalDatasetDetails) dataset.getDatasetDetails();
List<String> nodeGroup = findNodeGroupNodeNames(datasetDetails.getNodeGroupName());
if (nodeGroup == null) {
throw new AlgebricksException("Couldn't find node group " + datasetDetails.getNodeGroupName());
}
List<FileSplit> splitArray = new ArrayList<FileSplit>();
for (String nd : nodeGroup) {
String[] nodeStores = stores.get(nd);
if (nodeStores == null) {
LOGGER.warning("Node " + nd + " has no stores.");
throw new AlgebricksException("Node " + nd + " has no stores.");
} else {
for (int j = 0; j < nodeStores.length; j++) {
File f = new File(nodeStores[j] + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f)));
}
}
}
FileSplit[] splits = new FileSplit[splitArray.size()];
int i = 0;
for (FileSplit fs : splitArray) {
splits[i++] = fs;
}
return splits;
}
public String getRelativePath(String fileName) {
return dataverseName + File.separator + fileName;
}
public Map<String, TypeDataGen> getTypeDataGenMap() {
return typeDataGenMap;
}
public Map<String, IAType> getTypeDeclarations() {
return types;
}
public IAWriterFactory getWriterFactory() {
return writerFactory;
}
public MetadataTransactionContext getMetadataTransactionContext() {
return mdTxnCtx;
}
}