blob: 418eac5ed07d54ddb07dfb774cfc700b4ef63c2c [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.declared;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import edu.uci.ics.asterix.common.annotations.TypeDataGen;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.common.config.DatasetConfig.IndexType;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.formats.base.IDataFormat;
import edu.uci.ics.asterix.metadata.MetadataException;
import edu.uci.ics.asterix.metadata.MetadataManager;
import edu.uci.ics.asterix.metadata.MetadataTransactionContext;
import edu.uci.ics.asterix.metadata.api.IMetadataManager;
import edu.uci.ics.asterix.metadata.bootstrap.AsterixProperties;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledIndexDecl.IndexKind;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.Datatype;
import edu.uci.ics.asterix.metadata.entities.Dataverse;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.FeedDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.Index;
import edu.uci.ics.asterix.metadata.entities.InternalDatasetDetails;
import edu.uci.ics.asterix.metadata.entities.NodeGroup;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.common.utils.Triple;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.data.IAWriterFactory;
import edu.uci.ics.hyracks.algebricks.runtime.base.IEvaluatorFactory;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import edu.uci.ics.hyracks.dataflow.std.file.FileSplit;
import edu.uci.ics.hyracks.dataflow.std.file.IFileSplitProvider;
public class AqlCompiledMetadataDeclarations {
private static Logger LOGGER = Logger.getLogger(AqlCompiledMetadataDeclarations.class.getName());
// We are assuming that there is a one AqlCompiledMetadataDeclarations per
// transaction.
private final MetadataTransactionContext mdTxnCtx;
private String dataverseName = null;
private FileSplit outputFile;
private Map<String, String[]> stores;
private IDataFormat format;
private Map<String, String> config;
private final Map<String, IAType> types;
private final Map<String, TypeDataGen> typeDataGenMap;
private final IAWriterFactory writerFactory;
private IMetadataManager metadataManager = MetadataManager.INSTANCE;
private boolean isConnected = false;
public AqlCompiledMetadataDeclarations(MetadataTransactionContext mdTxnCtx, String dataverseName,
FileSplit outputFile, Map<String, String> config, Map<String, String[]> stores, Map<String, IAType> types,
Map<String, TypeDataGen> typeDataGenMap, IAWriterFactory writerFactory, boolean online) {
this.mdTxnCtx = mdTxnCtx;
this.dataverseName = dataverseName;
this.outputFile = outputFile;
this.config = config;
if (stores == null && online)
this.stores = AsterixProperties.INSTANCE.getStores();
else
this.stores = stores;
this.types = types;
this.typeDataGenMap = typeDataGenMap;
this.writerFactory = writerFactory;
}
public void connectToDataverse(String dvName) throws AlgebricksException, AsterixException {
if (isConnected) {
throw new AlgebricksException("You are already connected to " + dataverseName + " dataverse");
}
Dataverse dv;
try {
dv = metadataManager.getDataverse(mdTxnCtx, dvName);
} catch (Exception e) {
throw new AsterixException(e);
}
if (dv == null) {
throw new AlgebricksException("There is no dataverse with this name " + dvName + " to connect to.");
}
dataverseName = dvName;
isConnected = true;
try {
format = (IDataFormat) Class.forName(dv.getDataFormat()).newInstance();
} catch (Exception e) {
throw new AsterixException(e);
}
}
public void disconnectFromDataverse() throws AlgebricksException {
if (!isConnected)
throw new AlgebricksException("You are not connected to any dataverse");
else {
dataverseName = null;
format = null;
isConnected = false;
}
}
public boolean isConnectedToDataverse() {
return this.isConnected;
}
public String getDataverseName() {
return dataverseName;
}
public FileSplit getOutputFile() {
return outputFile;
}
public IDataFormat getFormat() throws AlgebricksException {
if (!isConnected)
throw new AlgebricksException("You need first to connect to a dataverse.");
return format;
}
public String getPropertyValue(String propertyName) {
return config.get(propertyName);
}
public IAType findType(String typeName) {
Datatype type;
try {
type = metadataManager.getDatatype(mdTxnCtx, dataverseName, typeName);
} catch (Exception e) {
throw new IllegalStateException();
}
if (type == null) {
throw new IllegalStateException();
}
return type.getDatatype();
}
public List<String> findNodeGroupNodeNames(String nodeGroupName) throws AlgebricksException, MetadataException {
NodeGroup ng = metadataManager.getNodegroup(mdTxnCtx, nodeGroupName);
if (ng == null) {
throw new AlgebricksException("No node group with this name " + nodeGroupName);
}
return ng.getNodeNames();
}
public String[] getStores(String nodeName) {
return stores.get(nodeName);
}
public Map<String, String[]> getAllStores() {
return stores;
}
public AqlCompiledDatasetDecl findDataset(String datasetName) {
try {
Dataset datasetRecord = this.metadataManager.getDataset(mdTxnCtx, dataverseName, datasetName);
if (datasetRecord == null) {
return null;
}
IAqlCompiledDatasetDetails acdd = null;
switch (datasetRecord.getType()) {
case FEED:
case INTERNAL: {
String typeName = datasetRecord.getDatatypeName();
InternalDatasetDetails id = (InternalDatasetDetails) datasetRecord.getDatasetDetails();
ARecordType recType = (ARecordType) findType(typeName);
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> partitioningEvalFactories = computePartitioningEvaluatorFactories(
id.getPartitioningKey(), recType);
List<Index> indexRecord = this.metadataManager.getDatasetIndexes(mdTxnCtx, dataverseName,
datasetName);
AqlCompiledIndexDecl primaryIndex = null;
List<AqlCompiledIndexDecl> secondaryIndexes = new ArrayList<AqlCompiledIndexDecl>();
for (int i = 0; i < indexRecord.size(); i++) {
Index rec = indexRecord.get(i);
if (rec.isPrimaryIndex()) {
primaryIndex = new AqlCompiledIndexDecl(rec.getIndexName(), IndexKind.BTREE,
rec.getKeyFieldNames());
} else {
secondaryIndexes.add(new AqlCompiledIndexDecl(rec.getIndexName(),
rec.getIndexType() == IndexType.BTREE ? IndexKind.BTREE : IndexKind.RTREE, rec
.getKeyFieldNames()));
}
}
if (datasetRecord.getType() == DatasetType.INTERNAL) {
acdd = new AqlCompiledInternalDatasetDetails(id.getPartitioningKey(),
partitioningEvalFactories, id.getNodeGroupName(), primaryIndex, secondaryIndexes);
} else {
acdd = new AqlCompiledFeedDatasetDetails(id.getPartitioningKey(), partitioningEvalFactories,
id.getNodeGroupName(), primaryIndex, secondaryIndexes,
((FeedDatasetDetails) id).getAdapter(), ((FeedDatasetDetails) id).getProperties(),
((FeedDatasetDetails) id).getFunctionIdentifier(), ((FeedDatasetDetails) id)
.getFeedState().toString());
}
break;
}
case EXTERNAL: {
acdd = new AqlCompiledExternalDatasetDetails(
((ExternalDatasetDetails) datasetRecord.getDatasetDetails()).getAdapter(),
((ExternalDatasetDetails) datasetRecord.getDatasetDetails()).getProperties());
break;
}
}
AqlCompiledDatasetDecl dataset = new AqlCompiledDatasetDecl(datasetRecord.getDatasetName(),
datasetRecord.getDatatypeName(), datasetRecord.getType(), acdd);
return dataset;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
public void setOutputFile(FileSplit outputFile) {
this.outputFile = outputFile;
}
public List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> computePartitioningEvaluatorFactories(
List<String> partitioningExprs, ARecordType recType) {
List<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>> evalFactories = new ArrayList<Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType>>(
partitioningExprs.size());
for (String expr : partitioningExprs) {
Triple<IEvaluatorFactory, ScalarFunctionCallExpression, IAType> evalFact = null;
try {
evalFact = format.partitioningEvaluatorFactory(recType, expr);
} catch (AlgebricksException e) {
throw new IllegalStateException(e);
}
evalFactories.add(evalFact);
}
return evalFactories;
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForInternalOrFeedDataset(
String datasetName, String targetIdxName) throws AlgebricksException, MetadataException {
FileSplit[] splits = splitsForInternalOrFeedDataset(datasetName, targetIdxName);
IFileSplitProvider splitProvider = new ConstantFileSplitProvider(splits);
String[] loc = new String[splits.length];
for (int p = 0; p < splits.length; p++) {
loc[p] = splits[p].getNodeName();
}
AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
}
private FileSplit[] splitsForInternalOrFeedDataset(String datasetName, String targetIdxName)
throws AlgebricksException, MetadataException {
File relPathFile = new File(getRelativePath(datasetName + "_idx_" + targetIdxName));
AqlCompiledDatasetDecl adecl = findDataset(datasetName);
if (adecl.getDatasetType() != DatasetType.INTERNAL & adecl.getDatasetType() != DatasetType.FEED) {
throw new AlgebricksException("Not an internal or feed dataset");
}
AqlCompiledInternalDatasetDetails compiledDatasetDetails = (AqlCompiledInternalDatasetDetails) adecl
.getAqlCompiledDatasetDetails();
List<String> nodeGroup = findNodeGroupNodeNames(compiledDatasetDetails.getNodegroupName());
if (nodeGroup == null) {
throw new AlgebricksException("Couldn't find node group " + compiledDatasetDetails.getNodegroupName());
}
List<FileSplit> splitArray = new ArrayList<FileSplit>();
for (String nd : nodeGroup) {
String[] nodeStores = stores.get(nd);
if (nodeStores == null) {
LOGGER.warning("Node " + nd + " has no stores.");
throw new AlgebricksException("Node " + nd + " has no stores.");
} else {
for (int j = 0; j < nodeStores.length; j++) {
File f = new File(nodeStores[j] + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f)));
}
}
}
FileSplit[] splits = new FileSplit[splitArray.size()];
int i = 0;
for (FileSplit fs : splitArray) {
splits[i++] = fs;
}
return splits;
}
public String getRelativePath(String fileName) {
return dataverseName + File.separator + fileName;
}
public Map<String, TypeDataGen> getTypeDataGenMap() {
return typeDataGenMap;
}
public Map<String, IAType> getTypeDeclarations() {
return types;
}
public IAWriterFactory getWriterFactory() {
return writerFactory;
}
public MetadataTransactionContext getMetadataTransactionContext() {
return mdTxnCtx;
}
}