| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.bad.function.rewriter; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.asterix.common.exceptions.CompilationException; |
| import org.apache.asterix.common.exceptions.ErrorCode; |
| import org.apache.asterix.common.metadata.DataverseName; |
| import org.apache.asterix.external.feed.watch.FeedActivityDetails; |
| import org.apache.asterix.external.util.ExternalDataUtils; |
| import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType; |
| import org.apache.asterix.metadata.declared.DataSourceId; |
| import org.apache.asterix.metadata.declared.FeedDataSource; |
| import org.apache.asterix.metadata.declared.MetadataProvider; |
| import org.apache.asterix.metadata.entities.Dataset; |
| import org.apache.asterix.metadata.entities.Feed; |
| import org.apache.asterix.metadata.entities.FeedConnection; |
| import org.apache.asterix.metadata.entities.FeedPolicyEntity; |
| import org.apache.asterix.metadata.entities.InternalDatasetDetails; |
| import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies; |
| import org.apache.asterix.om.functions.BuiltinFunctions; |
| import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter; |
| import org.apache.asterix.om.typecomputer.base.IResultTypeComputer; |
| import org.apache.asterix.om.types.ARecordType; |
| import org.apache.asterix.om.types.BuiltinType; |
| import org.apache.asterix.om.types.IAType; |
| import org.apache.asterix.om.utils.ConstantExpressionUtil; |
| import org.apache.asterix.optimizer.rules.UnnestToDataScanRule; |
| import org.apache.asterix.translator.util.PlanTranslationUtil; |
| import org.apache.commons.lang3.mutable.Mutable; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; |
| import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; |
| import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; |
| import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; |
| import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; |
| import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; |
| import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator; |
| import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator; |
| |
| public class BADFeedRewriter implements IFunctionToDataSourceRewriter, IResultTypeComputer { |
| public static final BADFeedRewriter INSTANCE = new BADFeedRewriter(); |
| |
| private BADFeedRewriter() { |
| } |
| |
| @Override |
| public boolean rewrite(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { |
| AbstractFunctionCallExpression f = UnnestToDataScanRule.getFunctionCall(opRef); |
| UnnestOperator unnest = (UnnestOperator) opRef.getValue(); |
| if (unnest.getPositionalVariable() != null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(), |
| "No positional variables are allowed over feeds."); |
| } |
| DataverseName dataverseName = |
| DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0)); |
| String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1); |
| String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2); |
| String subscriptionLocation = ConstantExpressionUtil.getStringArgument(f, 3); |
| String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4); |
| String outputType = ConstantExpressionUtil.getStringArgument(f, 5); |
| MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); |
| DataSourceId asid = new DataSourceId(dataverseName, getTargetFeed); |
| String policyName = (String) metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME); |
| FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverseName, policyName); |
| if (policy == null) { |
| policy = BuiltinFeedPolicies.getFeedPolicy(policyName); |
| if (policy == null) { |
| throw new CompilationException(ErrorCode.COMPILATION_ERROR, unnest.getSourceLocation(), |
| "Unknown feed policy:" + policyName); |
| } |
| } |
| ArrayList<LogicalVariable> feedDataScanOutputVariables = new ArrayList<>(); |
| String csLocations = (String) metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS); |
| List<LogicalVariable> pkVars = new ArrayList<>(); |
| FeedDataSource ds = createFeedDataSource(asid, targetDataset, sourceFeedName, subscriptionLocation, |
| metadataProvider, policy, outputType, csLocations, unnest.getVariable(), context, pkVars); |
| // The order for feeds is <Record-Meta-PK> |
| feedDataScanOutputVariables.add(unnest.getVariable()); |
| // Does it produce meta? |
| if (ds.hasMeta()) { |
| feedDataScanOutputVariables.add(context.newVar()); |
| } |
| // Does it produce pk? |
| if (ds.isChange()) { |
| feedDataScanOutputVariables.addAll(pkVars); |
| } |
| DataSourceScanOperator scan = new DataSourceScanOperator(feedDataScanOutputVariables, ds); |
| scan.setSourceLocation(unnest.getSourceLocation()); |
| List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs(); |
| scanInpList.addAll(unnest.getInputs()); |
| opRef.setValue(scan); |
| context.computeAndSetTypeEnvironmentForOperator(scan); |
| return true; |
| } |
| |
| private FeedDataSource createFeedDataSource(DataSourceId id, String targetDataset, String sourceFeedName, |
| String subscriptionLocation, MetadataProvider metadataProvider, FeedPolicyEntity feedPolicy, |
| String outputType, String locations, LogicalVariable recordVar, IOptimizationContext context, |
| List<LogicalVariable> pkVars) throws AlgebricksException { |
| Dataset dataset = metadataProvider.findDataset(id.getDataverseName(), targetDataset); |
| ARecordType feedOutputType = (ARecordType) metadataProvider.findType(id.getDataverseName(), outputType); |
| Feed sourceFeed = metadataProvider.findFeed(id.getDataverseName(), sourceFeedName); |
| FeedConnection feedConnection = |
| metadataProvider.findFeedConnection(id.getDataverseName(), sourceFeedName, targetDataset); |
| // Is a change feed? |
| List<IAType> pkTypes = null; |
| List<List<String>> partitioningKeys = null; |
| List<Integer> keySourceIndicator = null; |
| |
| List<ScalarFunctionCallExpression> keyAccessScalarFunctionCallExpression; |
| if (ExternalDataUtils.isChangeFeed(sourceFeed.getConfiguration())) { |
| List<Mutable<ILogicalExpression>> keyAccessExpression = new ArrayList<>(); |
| keyAccessScalarFunctionCallExpression = new ArrayList<>(); |
| pkTypes = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPrimaryKeyType(); |
| partitioningKeys = ((InternalDatasetDetails) dataset.getDatasetDetails()).getPartitioningKey(); |
| if (dataset.hasMetaPart()) { |
| keySourceIndicator = ((InternalDatasetDetails) dataset.getDatasetDetails()).getKeySourceIndicator(); |
| } |
| for (int i = 0; i < partitioningKeys.size(); i++) { |
| List<String> key = partitioningKeys.get(i); |
| if (keySourceIndicator == null || keySourceIndicator.get(i).intValue() == 0) { |
| PlanTranslationUtil.prepareVarAndExpression(key, recordVar, pkVars, keyAccessExpression, null, |
| context, null); |
| } else { |
| PlanTranslationUtil.prepareMetaKeyAccessExpression(key, recordVar, keyAccessExpression, pkVars, |
| null, context, null); |
| } |
| } |
| keyAccessExpression.forEach( |
| expr -> keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) expr.getValue())); |
| } else { |
| keyAccessScalarFunctionCallExpression = null; |
| } |
| FeedDataSource feedDataSource = new FeedDataSource(sourceFeed, id, targetDataset, feedOutputType, null, pkTypes, |
| keyAccessScalarFunctionCallExpression, sourceFeed.getFeedId(), |
| FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","), context.getComputationNodeDomain(), |
| feedConnection); |
| feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, feedPolicy); |
| return feedDataSource; |
| } |
| |
| @Override |
| public IAType computeType(ILogicalExpression expression, IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp) |
| throws AlgebricksException { |
| AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) expression; |
| if (f.getArguments().size() != BuiltinFunctions.FEED_COLLECT.getArity()) { |
| throw new AlgebricksException("Incorrect number of arguments -> arity is " |
| + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + f.getArguments().size()); |
| } |
| DataverseName dataverseName = |
| DataverseName.createFromCanonicalForm(ConstantExpressionUtil.getStringArgument(f, 0)); |
| String outputTypeName = ConstantExpressionUtil.getStringArgument(f, 5); |
| if (outputTypeName == null) { |
| return BuiltinType.ANY; |
| } |
| MetadataProvider metadata = (MetadataProvider) mp; |
| IAType outputType = metadata.findType(dataverseName, outputTypeName); |
| if (outputType == null) { |
| throw new AlgebricksException("Unknown type " + outputTypeName); |
| } |
| return outputType; |
| } |
| } |