blob: c6a72b6e98102196c3e816881668fc29a9f62d7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.bad.lang.statement;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.algebra.extension.IExtensionStatement;
import org.apache.asterix.app.active.ActiveNotificationHandler;
import org.apache.asterix.app.translator.QueryTranslator;
import org.apache.asterix.bad.BADConstants;
import org.apache.asterix.bad.ChannelJobService;
import org.apache.asterix.bad.lang.BADLangExtension;
import org.apache.asterix.bad.lang.BADParserFactory;
import org.apache.asterix.bad.metadata.Channel;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener;
import org.apache.asterix.bad.metadata.PrecompiledJobEventListener.PrecompiledType;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.MetadataException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.expression.CallExpr;
import org.apache.asterix.lang.common.expression.LiteralExpr;
import org.apache.asterix.lang.common.literal.StringLiteral;
import org.apache.asterix.lang.common.statement.DatasetDecl;
import org.apache.asterix.lang.common.statement.IDatasetDetailsDecl;
import org.apache.asterix.lang.common.statement.InsertStatement;
import org.apache.asterix.lang.common.statement.InternalDetailsDecl;
import org.apache.asterix.lang.common.statement.SetStatement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Function;
import org.apache.asterix.om.base.temporal.ADurationParserFactory;
import org.apache.asterix.translator.IStatementExecutor;
import org.apache.asterix.translator.IStatementExecutor.ResultDelivery;
import org.apache.asterix.translator.IStatementExecutor.Stats;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataset.IHyracksDataset;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.dataflow.common.data.parsers.IValueParser;
public class CreateChannelStatement implements IExtensionStatement {
private static final Logger LOGGER = Logger.getLogger(CreateChannelStatement.class.getName());
private final Identifier dataverseName;
private final Identifier channelName;
private final FunctionSignature function;
private final CallExpr period;
private String duration;
private InsertStatement channelResultsInsertQuery;
private String subscriptionsTableName;
private String resultsTableName;
private boolean distributed;
public CreateChannelStatement(Identifier dataverseName, Identifier channelName, FunctionSignature function,
Expression period, boolean distributed) {
this.channelName = channelName;
this.dataverseName = dataverseName;
this.function = function;
this.period = (CallExpr) period;
this.duration = "";
this.distributed = distributed;
}
public Identifier getDataverseName() {
return dataverseName;
}
public Identifier getChannelName() {
return channelName;
}
public String getResultsName() {
return resultsTableName;
}
public String getSubscriptionsName() {
return subscriptionsTableName;
}
public String getDuration() {
return duration;
}
public FunctionSignature getFunction() {
return function;
}
public Expression getPeriod() {
return period;
}
public InsertStatement getChannelResultsInsertQuery() {
return channelResultsInsertQuery;
}
@Override
public byte getCategory() {
return Category.DDL;
}
@Override
public <R, T> R accept(ILangVisitor<R, T> visitor, T arg) throws CompilationException {
return null;
}
public void initialize(MetadataTransactionContext mdTxnCtx, String subscriptionsTableName, String resultsTableName)
throws MetadataException, HyracksDataException {
Function lookup = MetadataManager.INSTANCE.getFunction(mdTxnCtx, function);
if (lookup == null) {
throw new MetadataException(" Unknown function " + function.getName());
}
if (!period.getFunctionSignature().getName().equals("duration")) {
throw new MetadataException(
"Expected argument period as a duration, but got " + period.getFunctionSignature().getName() + ".");
}
duration = ((StringLiteral) ((LiteralExpr) period.getExprList().get(0)).getValue()).getValue();
IValueParser durationParser = ADurationParserFactory.INSTANCE.createValueParser();
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream outputStream = new DataOutputStream(bos);
durationParser.parse(duration.toCharArray(), 0, duration.toCharArray().length, outputStream);
this.resultsTableName = resultsTableName;
this.subscriptionsTableName = subscriptionsTableName;
}
@Override
public byte getKind() {
return Kind.EXTENSION;
}
private void createDatasets(IStatementExecutor statementExecutor, Identifier subscriptionsName,
Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, String dataverse) throws AsterixException, Exception {
Identifier subscriptionsTypeName = new Identifier(BADConstants.ChannelSubscriptionsType);
Identifier resultsTypeName = new Identifier(BADConstants.ChannelResultsType);
//Setup the subscriptions dataset
List<List<String>> partitionFields = new ArrayList<>();
List<Integer> keyIndicators = new ArrayList<>();
keyIndicators.add(0);
List<String> fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.SubscriptionId);
partitionFields.add(fieldNames);
IDatasetDetailsDecl idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
DatasetDecl createSubscriptionsDataset = new DatasetDecl(new Identifier(dataverse), subscriptionsName,
new Identifier(BADConstants.BAD_DATAVERSE_NAME), subscriptionsTypeName, null, null, null, null,
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
//Setup the results dataset
partitionFields = new ArrayList<>();
fieldNames = new ArrayList<>();
fieldNames.add(BADConstants.ResultId);
partitionFields.add(fieldNames);
idd = new InternalDetailsDecl(partitionFields, keyIndicators, true, null, false);
DatasetDecl createResultsDataset = new DatasetDecl(new Identifier(dataverse), resultsName,
new Identifier(BADConstants.BAD_DATAVERSE_NAME), resultsTypeName, null, null, null, null,
new HashMap<String, String>(), new HashMap<String, String>(), DatasetType.INTERNAL, idd, true);
//Run both statements to create datasets
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createSubscriptionsDataset,
hcc);
metadataProvider.getLocks().reset();
((QueryTranslator) statementExecutor).handleCreateDatasetStatement(metadataProvider, createResultsDataset, hcc);
}
private JobSpecification createChannelJob(IStatementExecutor statementExecutor, Identifier subscriptionsName,
Identifier resultsName, MetadataProvider metadataProvider, IHyracksClientConnection hcc,
IHyracksDataset hdc, Stats stats, String dataverse) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("SET inline_with \"false\";\n");
builder.append("insert into " + dataverse + "." + resultsName);
builder.append(" as a (\n" + "with " + BADConstants.ChannelExecutionTime + " as current_datetime() \n");
builder.append("select result, ");
builder.append(BADConstants.ChannelExecutionTime + ", ");
builder.append("sub." + BADConstants.SubscriptionId + " as " + BADConstants.SubscriptionId + ",");
builder.append("current_datetime() as " + BADConstants.DeliveryTime + "\n");
builder.append("from " + dataverse + "." + subscriptionsName + " sub,\n");
builder.append(BADConstants.BAD_DATAVERSE_NAME + "." + BADConstants.BROKER_KEYWORD + " b, \n");
builder.append(function.getNamespace() + "." + function.getName() + "(");
int i = 0;
for (; i < function.getArity() - 1; i++) {
builder.append("sub.param" + i + ",");
}
builder.append("sub.param" + i + ") result \n");
builder.append("where b." + BADConstants.BrokerName + " = sub." + BADConstants.BrokerName + "\n");
builder.append("and b." + BADConstants.DataverseName + " = sub." + BADConstants.DataverseName + "\n");
builder.append(")");
builder.append(" returning a");
builder.append(";");
BADParserFactory factory = new BADParserFactory();
List<Statement> fStatements = factory.createParser(new StringReader(builder.toString())).parse();
SetStatement ss = (SetStatement) fStatements.get(0);
metadataProvider.getConfig().put(ss.getPropName(), ss.getPropValue());
return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1),
hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null);
}
private void setupExecutorJob(EntityId entityId, JobSpecification channeljobSpec, IHyracksClientConnection hcc,
PrecompiledJobEventListener listener, boolean predistributed) throws Exception {
if (channeljobSpec != null) {
//TODO: Find a way to fix optimizer tests so we don't need this check
JobId jobId = null;
if (predistributed) {
jobId = hcc.distributeJob(channeljobSpec);
}
ScheduledExecutorService ses = ChannelJobService.startJob(channeljobSpec, EnumSet.noneOf(JobFlag.class),
jobId, hcc, ChannelJobService.findPeriod(duration));
listener.storeDistributedInfo(jobId, ses, null);
}
}
@Override
public void handle(IStatementExecutor statementExecutor, MetadataProvider metadataProvider,
IHyracksClientConnection hcc, IHyracksDataset hdc, ResultDelivery resultDelivery, Stats stats,
int resultSetIdCounter) throws HyracksDataException, AlgebricksException {
//This function performs three tasks:
//1. Create datasets for the Channel
//2. Create and run the Channel Job
//3. Create the metadata entry for the channel
//TODO: Figure out how to handle when a subset of the 3 tasks fails
//TODO: The compiled job will break if anything changes on the function or two datasets
// Need to make sure we do proper checking when altering these things
String dataverse = ((QueryTranslator) statementExecutor).getActiveDataverse(dataverseName);
Identifier subscriptionsName = new Identifier(channelName + BADConstants.subscriptionEnding);
Identifier resultsName = new Identifier(channelName + BADConstants.resultsEnding);
EntityId entityId = new EntityId(BADConstants.CHANNEL_EXTENSION_NAME, dataverse, channelName.getValue());
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
ActiveNotificationHandler activeEventHandler =
(ActiveNotificationHandler) appCtx.getActiveNotificationHandler();
PrecompiledJobEventListener listener = (PrecompiledJobEventListener) activeEventHandler.getListener(entityId);
boolean alreadyActive = false;
Channel channel = null;
MetadataTransactionContext mdTxnCtx = null;
try {
mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
metadataProvider.setMetadataTxnContext(mdTxnCtx);
channel = BADLangExtension.getChannel(mdTxnCtx, dataverse, channelName.getValue());
if (channel != null) {
throw new AlgebricksException("A channel with this name " + channelName + " already exists.");
}
if (listener != null) {
alreadyActive = listener.isActive();
}
if (alreadyActive) {
throw new AsterixException("Channel " + channelName + " is already running");
}
initialize(mdTxnCtx, subscriptionsName.getValue(), resultsName.getValue());
channel = new Channel(dataverse, channelName.getValue(), subscriptionsTableName, resultsTableName, function,
duration);
//check if names are available before creating anything
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
if (MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()) != null) {
throw new AsterixException("The channel name:" + channelName + " is not available.");
}
MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(),
metadataProvider.getDefaultDataverse());
tempMdProvider.getConfig().putAll(metadataProvider.getConfig());
//Create Channel Datasets
createDatasets(statementExecutor, subscriptionsName, resultsName, tempMdProvider, hcc, hdc, dataverse);
tempMdProvider.getLocks().reset();
//Create Channel Internal Job
JobSpecification channeljobSpec = createChannelJob(statementExecutor, subscriptionsName, resultsName,
tempMdProvider, hcc, hdc, stats, dataverse);
// Now we subscribe
if (listener == null) {
List<IDataset> datasets = new ArrayList<>();
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, subscriptionsName.getValue()));
datasets.add(MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverse, resultsName.getValue()));
//TODO: Add datasets used by channel function
listener = new PrecompiledJobEventListener(appCtx, entityId, PrecompiledType.CHANNEL, datasets, null,
"BadListener");
activeEventHandler.registerListener(listener);
}
if (distributed) {
setupExecutorJob(entityId, channeljobSpec, hcc, listener, true);
} else {
setupExecutorJob(entityId, channeljobSpec, hcc, listener, false);
}
MetadataManager.INSTANCE.addEntity(mdTxnCtx, channel);
MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
} catch (Exception e) {
if (mdTxnCtx != null) {
QueryTranslator.abort(e, e, mdTxnCtx);
}
LOGGER.log(Level.WARNING, "Failed creating a channel", e);
throw new HyracksDataException(e);
} finally {
metadataProvider.getLocks().unlock();
}
}
}