blob: 7e25a230dd6af660d68aeaefdad3ec8d34a74af0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.function.rewriter;
import java.util.ArrayList;
import java.util.List;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.feed.watch.FeedActivityDetails;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.metadata.declared.DataSourceId;
import org.apache.asterix.metadata.declared.FeedDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Feed;
import org.apache.asterix.metadata.entities.FeedConnection;
import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.ConstantExpressionUtil;
import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
import org.apache.asterix.translator.util.PlanTranslationUtil;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
public class BADFeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer {
public static final BADFeedRewriter INSTANCE = new BADFeedRewriter();
private BADFeedRewriter() {
}
@Override
public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef);
UnnestOperator unnest = (UnnestOperator) opRef.getValue();
if (unnest.getPositionalVariable() != null) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
"No positional variables are allowed over feeds.");
}
DataverseName dataverseName =
DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3);
String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider();
DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed);
String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverseName, policyName);
if (policy == null) {
policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
if (policy == null) {
throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(),
"Unknown feed policy:" + policyName);
}
}
ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>();
String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
List<LogicalVariable> pkVars = new ArrayList<>();
FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation,
metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars);
// The order for feeds is <Record-Meta-PK>
feedDataScanOutputVariables.add(unnest.getVariable());
// Does it produce meta?
if (ds.hasMeta()) {
feedDataScanOutputVariables.add(context.newVar());
}
// Does it produce pk?
if (ds.isChange()) {
feedDataScanOutputVariables.addAll(pkVars);
}
DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds);
scan.setSourceLocation(unnest.getSourceLocation());
List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
scanInpList.addAll(unnest.getInputs());
opRef.setValue(scan);
context.computeAndSetTypeEnvironmentForOperator(scan);
return true;
}
private FeedDataSource createFeedDataSource(DataSourceId id, String targetDataset, String sourceFeedName,
String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy,
String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context,
List<LogicalVariable> pkVars) throws AlgebricksException {
Dataset dataset = metadataProvider.findDataset(id.getDataverseName(), targetDataset);
ARecordType feedOutputType = (ARecordType) metadataProvider.findType(id.getDataverseName(), outputType);
Feed sourceFeed = metadataProvider.findFeed(id.getDataverseName(), sourceFeedName);
FeedConnection feedConnection =
metadataProvider.findFeedConnection(id.getDataverseName(), sourceFeedName, targetDataset);
// Is a change feed?
List<IAType> pkTypes = null;
List<List<String>> partitioningKeys = null;
List<Integer> keySourceIndicator = null;
List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression;
if (ExternalDataUtils.isChangeFeed(sourceFeed.getConfiguration())) {
List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>();
keyAccessScalarFunctionCallExpression = new ArrayList<>();
pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType();
partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey();
if (dataset.hasMetaPart()) {
keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator();
}
for (int i = 0; i < partitioningKeys.size(); i++) {
List<String> key = partitioningKeys.get(i);
if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) {
PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null,
context, null);
} else {
PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars,
null, context, null);
}
}
keyAccessExpression.forEach(
expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue()));
} else {
keyAccessScalarFunctionCallExpression = null;
}
FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, id, targetDataset, feedOutputType, null, pkTypes,
keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(),
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(),
feedConnection);
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy);
return feedDataSource;
}
@Override
public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
throws AlgebricksException {
AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression;
if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) {
throw new AlgebricksException("Incorrect number of arguments -> arity is "
+ BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size());
}
DataverseName dataverseName =
DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0));
String outputTypeName = ConstantExpressionUtil.getStringArgument(f, 5);
if (outputTypeName == null) {
return BuiltinType.ANY;
}
MetadataProvider metadata = (MetadataProvider) mp;
IAType outputType = metadata.findType(dataverseName, outputTypeName);
if (outputType == null) {
throw new AlgebricksException("Unknown type " + outputTypeName);
}
return outputType;
}
}