blob: e4c7ba21cd9b47d0d0ad25350131da51dab98672 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.metadata.declared;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType;
import edu.uci.ics.asterix.metadata.entities.Dataset;
import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails;
import edu.uci.ics.asterix.metadata.utils.DatasetUtils;
import edu.uci.ics.asterix.om.types.ARecordType;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSource;
import edu.uci.ics.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.AsterixNodeGroupDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.INodeDomain;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.OrderColumn;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
import edu.uci.ics.hyracks.algebricks.core.algebra.properties.UnorderedPartitionedProperty;
public class AqlDataSource implements IDataSource<AqlSourceId> {
private AqlSourceId id;
private Dataset dataset;
private IAType[] schemaTypes;
private INodeDomain domain;
private AqlDataSourceType datasourceType;
public enum AqlDataSourceType {
INTERNAL,
FEED,
EXTERNAL,
EXTERNAL_FEED
}
public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType, AqlDataSourceType datasourceType)
throws AlgebricksException {
this.id = id;
this.dataset = dataset;
this.datasourceType = datasourceType;
try {
switch (datasourceType) {
case FEED:
initFeedDataset(itemType, dataset);
case INTERNAL: {
initInternalDataset(itemType);
break;
}
case EXTERNAL_FEED:
case EXTERNAL: {
initExternalDataset(itemType);
break;
}
default: {
throw new IllegalArgumentException();
}
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
public AqlDataSource(AqlSourceId id, Dataset dataset, IAType itemType) throws AlgebricksException {
this.id = id;
this.dataset = dataset;
try {
switch (dataset.getDatasetType()) {
case FEED:
initFeedDataset(itemType, dataset);
break;
case INTERNAL:
initInternalDataset(itemType);
break;
case EXTERNAL: {
initExternalDataset(itemType);
break;
}
default: {
throw new IllegalArgumentException();
}
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
}
// TODO: Seems like initFeedDataset() could simply call this method.
private void initInternalDataset(IAType itemType) throws IOException {
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
ARecordType recordType = (ARecordType) itemType;
int n = partitioningKeys.size();
schemaTypes = new IAType[n + 1];
for (int i = 0; i < n; i++) {
schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
}
schemaTypes[n] = itemType;
domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
}
private void initFeedDataset(IAType itemType, Dataset dataset) throws IOException {
if (dataset.getDatasetDetails() instanceof ExternalDatasetDetails) {
initExternalDataset(itemType);
} else {
List<String> partitioningKeys = DatasetUtils.getPartitioningKeys(dataset);
int n = partitioningKeys.size();
schemaTypes = new IAType[n + 1];
ARecordType recordType = (ARecordType) itemType;
for (int i = 0; i < n; i++) {
schemaTypes[i] = recordType.getFieldType(partitioningKeys.get(i));
}
schemaTypes[n] = itemType;
domain = new AsterixNodeGroupDomain(DatasetUtils.getNodegroupName(dataset));
}
}
private void initExternalDataset(IAType itemType) {
schemaTypes = new IAType[1];
schemaTypes[0] = itemType;
INodeDomain domainForExternalData = new INodeDomain() {
@Override
public Integer cardinality() {
return null;
}
@Override
public boolean sameAs(INodeDomain domain) {
return domain == this;
}
};
domain = domainForExternalData;
}
@Override
public AqlSourceId getId() {
return id;
}
public Dataset getDataset() {
return dataset;
}
@Override
public IAType[] getSchemaTypes() {
return schemaTypes;
}
@Override
public String toString() {
return id.toString();
// return "AqlDataSource(\"" + id.getDataverseName() + "/" +
// id.getDatasetName() + "\")";
}
@Override
public IDataSourcePropertiesProvider getPropertiesProvider() {
return new AqlDataSourcePartitioningProvider(dataset.getDatasetType(), domain);
}
@Override
public void computeFDs(List<LogicalVariable> scanVariables, List<FunctionalDependency> fdList) {
int n = scanVariables.size();
if (n > 1) {
List<LogicalVariable> head = new ArrayList<LogicalVariable>(scanVariables.subList(0, n - 1));
List<LogicalVariable> tail = new ArrayList<LogicalVariable>(1);
tail.addAll(scanVariables);
FunctionalDependency fd = new FunctionalDependency(head, tail);
fdList.add(fd);
}
}
private static class AqlDataSourcePartitioningProvider implements IDataSourcePropertiesProvider {
private INodeDomain domain;
private DatasetType datasetType;
public AqlDataSourcePartitioningProvider(DatasetType datasetType, INodeDomain domain) {
this.datasetType = datasetType;
this.domain = domain;
}
@Override
public IPhysicalPropertiesVector computePropertiesVector(List<LogicalVariable> scanVariables) {
switch (datasetType) {
case EXTERNAL: {
IPartitioningProperty pp = new RandomPartitioningProperty(domain);
List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
return new StructuralPropertiesVector(pp, propsLocal);
}
case FEED: {
int n = scanVariables.size();
IPartitioningProperty pp;
if (n < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
Set<LogicalVariable> pvars = new HashSet<LogicalVariable>();
int i = 0;
for (LogicalVariable v : scanVariables) {
pvars.add(v);
++i;
if (i >= n - 1) {
break;
}
}
pp = new UnorderedPartitionedProperty(pvars, domain);
}
List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
return new StructuralPropertiesVector(pp, propsLocal);
}
case INTERNAL: {
int n = scanVariables.size();
IPartitioningProperty pp;
if (n < 2) {
pp = new RandomPartitioningProperty(domain);
} else {
Set<LogicalVariable> pvars = new HashSet<LogicalVariable>();
int i = 0;
for (LogicalVariable v : scanVariables) {
pvars.add(v);
++i;
if (i >= n - 1) {
break;
}
}
pp = new UnorderedPartitionedProperty(pvars, domain);
}
List<ILocalStructuralProperty> propsLocal = new ArrayList<ILocalStructuralProperty>();
for (int i = 0; i < n - 1; i++) {
propsLocal.add(new LocalOrderProperty(new OrderColumn(scanVariables.get(i), OrderKind.ASC)));
}
return new StructuralPropertiesVector(pp, propsLocal);
}
default: {
throw new IllegalArgumentException();
}
}
}
}
public AqlDataSourceType getDatasourceType() {
return datasourceType;
}
}