| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.lang.aql.statement; |
| |
| import java.io.StringReader; |
| import java.rmi.RemoteException; |
| import java.util.List; |
| import java.util.logging.Level; |
| import java.util.logging.Logger; |
| |
| import org.apache.asterix.active.EntityId; |
| import org.apache.asterix.common.exceptions.ACIDException; |
| import org.apache.asterix.common.exceptions.AsterixException; |
| import org.apache.asterix.common.functions.FunctionSignature; |
| import org.apache.asterix.external.feed.management.FeedConnectionRequest; |
| import org.apache.asterix.external.feed.policy.FeedPolicyAccessor; |
| import org.apache.asterix.external.feed.watch.FeedActivity; |
| import org.apache.asterix.external.util.ExternalDataConstants; |
| import org.apache.asterix.lang.aql.parser.AQLParserFactory; |
| import org.apache.asterix.lang.common.base.IParser; |
| import org.apache.asterix.lang.common.base.IParserFactory; |
| import org.apache.asterix.lang.common.base.Statement; |
| import org.apache.asterix.lang.common.statement.InsertStatement; |
| import org.apache.asterix.lang.common.statement.Query; |
| import org.apache.asterix.lang.common.util.FunctionUtil; |
| import org.apache.asterix.lang.common.visitor.base.ILangVisitor; |
| import org.apache.asterix.metadata.MetadataException; |
| import org.apache.asterix.metadata.MetadataManager; |
| import org.apache.asterix.metadata.MetadataTransactionContext; |
| import org.apache.asterix.metadata.entities.Feed; |
| import org.apache.asterix.metadata.entities.Function; |
| import org.apache.asterix.metadata.feeds.FeedMetadataUtil; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| |
| /** |
| * Represents the AQL statement for subscribing to a feed. |
| * This AQL statement is private and may not be used by the end-user. |
| */ |
| public class SubscribeFeedStatement implements Statement { |
| |
| private static final Logger LOGGER = Logger.getLogger(SubscribeFeedStatement.class.getName()); |
| private final FeedConnectionRequest connectionRequest; |
| private Query query; |
| private final int varCounter; |
| private final String[] locations; |
| |
| public static final String WAIT_FOR_COMPLETION = "wait-for-completion-feed"; |
| private final IParserFactory parserFactory = new AQLParserFactory(); |
| |
| public SubscribeFeedStatement(String[] locations, FeedConnectionRequest subscriptionRequest) { |
| this.connectionRequest = subscriptionRequest; |
| this.varCounter = 0; |
| this.locations = locations; |
| } |
| |
| public void initialize(MetadataTransactionContext mdTxnCtx) throws MetadataException { |
| this.query = new Query(false); |
| EntityId sourceFeedId = connectionRequest.getFeedJointKey().getFeedId(); |
| Feed subscriberFeed = |
| MetadataManager.INSTANCE.getFeed(mdTxnCtx, connectionRequest.getReceivingFeedId().getDataverse(), |
| connectionRequest.getReceivingFeedId().getEntityName()); |
| if (subscriberFeed == null) { |
| throw new IllegalStateException(" Subscriber feed " + subscriberFeed + " not found."); |
| } |
| |
| String feedOutputType = getOutputType(mdTxnCtx); |
| FunctionSignature appliedFunction = subscriberFeed.getAppliedFunction(); |
| Function function = null; |
| if (appliedFunction != null) { |
| function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); |
| if (function == null) { |
| throw new MetadataException(" Unknown function " + appliedFunction); |
| } else if (function.getParams().size() > 1) { |
| throw new MetadataException( |
| " Incompatible function: " + appliedFunction + " Number if arguments must be 1"); |
| } |
| } |
| |
| StringBuilder builder = new StringBuilder(); |
| builder.append("use dataverse " + sourceFeedId.getDataverse() + ";\n"); |
| builder.append("set" + " " + FunctionUtil.IMPORT_PRIVATE_FUNCTIONS + " " + "'" + Boolean.TRUE + "'" + ";\n"); |
| builder.append("set" + " " + FeedActivity.FeedActivityDetails.FEED_POLICY_NAME + " " + "'" |
| + connectionRequest.getPolicy() + "'" + ";\n"); |
| |
| builder.append("insert into dataset " + connectionRequest.getTargetDataset() + " "); |
| builder.append(" (" + " for $x in feed-collect ('" + sourceFeedId.getDataverse() + "'" + "," + "'" |
| + sourceFeedId.getEntityName() + "'" + "," + "'" |
| + connectionRequest.getReceivingFeedId().getEntityName() + "'" + "," + "'" |
| + connectionRequest.getSubscriptionLocation().name() + "'" + "," + "'" |
| + connectionRequest.getTargetDataset() + "'" + "," + "'" + feedOutputType + "'" + ")"); |
| |
| List<String> functionsToApply = connectionRequest.getFunctionsToApply(); |
| if ((functionsToApply != null) && functionsToApply.isEmpty()) { |
| builder.append(" return $x"); |
| } else { |
| String rValueName = "x"; |
| String lValueName = "y"; |
| int variableIndex = 0; |
| for (String functionName : functionsToApply) { |
| function = MetadataManager.INSTANCE.getFunction(mdTxnCtx, appliedFunction); |
| variableIndex++; |
| switch (function.getLanguage().toUpperCase()) { |
| case Function.LANGUAGE_AQL: |
| builder.append( |
| " let " + "$" + lValueName + variableIndex + ":=(" + function.getFunctionBody() + ")"); |
| builder.append("\n"); |
| break; |
| case Function.LANGUAGE_JAVA: |
| builder.append(" let " + "$" + lValueName + variableIndex + ":=" + functionName + "(" + "$" |
| + rValueName + ")"); |
| rValueName = lValueName + variableIndex; |
| break; |
| } |
| builder.append("\n"); |
| } |
| builder.append("return $" + lValueName + variableIndex); |
| } |
| builder.append(")"); |
| builder.append(";"); |
| if (LOGGER.isLoggable(Level.INFO)) { |
| LOGGER.info("Connect feed statement translated to\n" + builder.toString()); |
| } |
| IParser parser = parserFactory.createParser(new StringReader(builder.toString())); |
| |
| List<Statement> statements; |
| try { |
| statements = parser.parse(); |
| query = ((InsertStatement) statements.get(3)).getQuery(); |
| } catch (AsterixException pe) { |
| throw new MetadataException(pe); |
| } |
| |
| } |
| |
| public Query getQuery() { |
| return query; |
| } |
| |
| public int getVarCounter() { |
| return varCounter; |
| } |
| |
| @Override |
| public byte getKind() { |
| return Statement.Kind.SUBSCRIBE_FEED; |
| } |
| |
| public String getPolicy() { |
| return connectionRequest.getPolicy(); |
| } |
| |
| public FeedConnectionRequest getSubscriptionRequest() { |
| return connectionRequest; |
| } |
| |
| @Override |
| public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws AsterixException { |
| return null; |
| } |
| |
| public String getDataverseName() { |
| return connectionRequest.getReceivingFeedId().getDataverse(); |
| } |
| |
| private String getOutputType(MetadataTransactionContext mdTxnCtx) throws MetadataException { |
| String outputType = null; |
| EntityId feedId = connectionRequest.getReceivingFeedId(); |
| Feed feed = MetadataManager.INSTANCE.getFeed(mdTxnCtx, feedId.getDataverse(), feedId.getEntityName()); |
| FeedPolicyAccessor policyAccessor = new FeedPolicyAccessor(connectionRequest.getPolicyParameters()); |
| try { |
| switch (feed.getFeedType()) { |
| case PRIMARY: |
| outputType = FeedMetadataUtil |
| .getOutputType(feed, feed.getAdapterConfiguration(), ExternalDataConstants.KEY_TYPE_NAME) |
| .getTypeName(); |
| break; |
| case SECONDARY: |
| outputType = FeedMetadataUtil.getSecondaryFeedOutput(feed, policyAccessor, mdTxnCtx); |
| break; |
| } |
| return outputType; |
| |
| } catch (AlgebricksException | RemoteException | ACIDException ae) { |
| throw new MetadataException(ae); |
| } |
| } |
| |
| public String[] getLocations() { |
| return locations; |
| } |
| } |